about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/jmespath/parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/jmespath/parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/jmespath/parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/jmespath/parser.py527
1 files changed, 527 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/jmespath/parser.py b/.venv/lib/python3.12/site-packages/jmespath/parser.py
new file mode 100644
index 00000000..47066880
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/jmespath/parser.py
@@ -0,0 +1,527 @@
+"""Top down operator precedence parser.
+
+This is an implementation of Vaughan R. Pratt's
+"Top Down Operator Precedence" parser.
+(http://dl.acm.org/citation.cfm?doid=512927.512931).
+
+These are some additional resources that help explain the
+general idea behind a Pratt parser:
+
+* http://effbot.org/zone/simple-top-down-parsing.htm
+* http://javascript.crockford.com/tdop/tdop.html
+
+A few notes on the implementation.
+
+* All the nud/led tokens are on the Parser class itself, and are dispatched
+  using getattr().  This keeps all the parsing logic contained to a single
+  class.
+* We use two passes through the data.  One to create a list of token,
+  then one pass through the tokens to create the AST.  While the lexer actually
+  yields tokens, we convert it to a list so we can easily implement two tokens
+  of lookahead.  A previous implementation used a fixed circular buffer, but it
+  was significantly slower.  Also, the average jmespath expression typically
+  does not have a large amount of token so this is not an issue.  And
+  interestingly enough, creating a token list first is actually faster than
+  consuming from the token iterator one token at a time.
+
+"""
+import random
+
+from jmespath import lexer
+from jmespath.compat import with_repr_method
+from jmespath import ast
+from jmespath import exceptions
+from jmespath import visitor
+
+
+class Parser(object):
+    BINDING_POWER = {
+        'eof': 0,
+        'unquoted_identifier': 0,
+        'quoted_identifier': 0,
+        'literal': 0,
+        'rbracket': 0,
+        'rparen': 0,
+        'comma': 0,
+        'rbrace': 0,
+        'number': 0,
+        'current': 0,
+        'expref': 0,
+        'colon': 0,
+        'pipe': 1,
+        'or': 2,
+        'and': 3,
+        'eq': 5,
+        'gt': 5,
+        'lt': 5,
+        'gte': 5,
+        'lte': 5,
+        'ne': 5,
+        'flatten': 9,
+        # Everything above stops a projection.
+        'star': 20,
+        'filter': 21,
+        'dot': 40,
+        'not': 45,
+        'lbrace': 50,
+        'lbracket': 55,
+        'lparen': 60,
+    }
+    # The maximum binding power for a token that can stop
+    # a projection.
+    _PROJECTION_STOP = 10
+    # The _MAX_SIZE most recent expressions are cached in
+    # _CACHE dict.
+    _CACHE = {}
+    _MAX_SIZE = 128
+
+    def __init__(self, lookahead=2):
+        self.tokenizer = None
+        self._tokens = [None] * lookahead
+        self._buffer_size = lookahead
+        self._index = 0
+
+    def parse(self, expression):
+        cached = self._CACHE.get(expression)
+        if cached is not None:
+            return cached
+        parsed_result = self._do_parse(expression)
+        self._CACHE[expression] = parsed_result
+        if len(self._CACHE) > self._MAX_SIZE:
+            self._free_cache_entries()
+        return parsed_result
+
+    def _do_parse(self, expression):
+        try:
+            return self._parse(expression)
+        except exceptions.LexerError as e:
+            e.expression = expression
+            raise
+        except exceptions.IncompleteExpressionError as e:
+            e.set_expression(expression)
+            raise
+        except exceptions.ParseError as e:
+            e.expression = expression
+            raise
+
+    def _parse(self, expression):
+        self.tokenizer = lexer.Lexer().tokenize(expression)
+        self._tokens = list(self.tokenizer)
+        self._index = 0
+        parsed = self._expression(binding_power=0)
+        if not self._current_token() == 'eof':
+            t = self._lookahead_token(0)
+            raise exceptions.ParseError(t['start'], t['value'], t['type'],
+                                        "Unexpected token: %s" % t['value'])
+        return ParsedResult(expression, parsed)
+
+    def _expression(self, binding_power=0):
+        left_token = self._lookahead_token(0)
+        self._advance()
+        nud_function = getattr(
+            self, '_token_nud_%s' % left_token['type'],
+            self._error_nud_token)
+        left = nud_function(left_token)
+        current_token = self._current_token()
+        while binding_power < self.BINDING_POWER[current_token]:
+            led = getattr(self, '_token_led_%s' % current_token, None)
+            if led is None:
+                error_token = self._lookahead_token(0)
+                self._error_led_token(error_token)
+            else:
+                self._advance()
+                left = led(left)
+                current_token = self._current_token()
+        return left
+
+    def _token_nud_literal(self, token):
+        return ast.literal(token['value'])
+
+    def _token_nud_unquoted_identifier(self, token):
+        return ast.field(token['value'])
+
+    def _token_nud_quoted_identifier(self, token):
+        field = ast.field(token['value'])
+        # You can't have a quoted identifier as a function
+        # name.
+        if self._current_token() == 'lparen':
+            t = self._lookahead_token(0)
+            raise exceptions.ParseError(
+                0, t['value'], t['type'],
+                'Quoted identifier not allowed for function names.')
+        return field
+
+    def _token_nud_star(self, token):
+        left = ast.identity()
+        if self._current_token() == 'rbracket':
+            right = ast.identity()
+        else:
+            right = self._parse_projection_rhs(self.BINDING_POWER['star'])
+        return ast.value_projection(left, right)
+
+    def _token_nud_filter(self, token):
+        return self._token_led_filter(ast.identity())
+
+    def _token_nud_lbrace(self, token):
+        return self._parse_multi_select_hash()
+
+    def _token_nud_lparen(self, token):
+        expression = self._expression()
+        self._match('rparen')
+        return expression
+
+    def _token_nud_flatten(self, token):
+        left = ast.flatten(ast.identity())
+        right = self._parse_projection_rhs(
+            self.BINDING_POWER['flatten'])
+        return ast.projection(left, right)
+
+    def _token_nud_not(self, token):
+        expr = self._expression(self.BINDING_POWER['not'])
+        return ast.not_expression(expr)
+
+    def _token_nud_lbracket(self, token):
+        if self._current_token() in ['number', 'colon']:
+            right = self._parse_index_expression()
+            # We could optimize this and remove the identity() node.
+            # We don't really need an index_expression node, we can
+            # just use emit an index node here if we're not dealing
+            # with a slice.
+            return self._project_if_slice(ast.identity(), right)
+        elif self._current_token() == 'star' and \
+                self._lookahead(1) == 'rbracket':
+            self._advance()
+            self._advance()
+            right = self._parse_projection_rhs(self.BINDING_POWER['star'])
+            return ast.projection(ast.identity(), right)
+        else:
+            return self._parse_multi_select_list()
+
+    def _parse_index_expression(self):
+        # We're here:
+        # [<current>
+        #  ^
+        #  | current token
+        if (self._lookahead(0) == 'colon' or
+                self._lookahead(1) == 'colon'):
+            return self._parse_slice_expression()
+        else:
+            # Parse the syntax [number]
+            node = ast.index(self._lookahead_token(0)['value'])
+            self._advance()
+            self._match('rbracket')
+            return node
+
+    def _parse_slice_expression(self):
+        # [start:end:step]
+        # Where start, end, and step are optional.
+        # The last colon is optional as well.
+        parts = [None, None, None]
+        index = 0
+        current_token = self._current_token()
+        while not current_token == 'rbracket' and index < 3:
+            if current_token == 'colon':
+                index += 1
+                if index == 3:
+                    self._raise_parse_error_for_token(
+                        self._lookahead_token(0), 'syntax error')
+                self._advance()
+            elif current_token == 'number':
+                parts[index] = self._lookahead_token(0)['value']
+                self._advance()
+            else:
+                self._raise_parse_error_for_token(
+                    self._lookahead_token(0), 'syntax error')
+            current_token = self._current_token()
+        self._match('rbracket')
+        return ast.slice(*parts)
+
+    def _token_nud_current(self, token):
+        return ast.current_node()
+
+    def _token_nud_expref(self, token):
+        expression = self._expression(self.BINDING_POWER['expref'])
+        return ast.expref(expression)
+
+    def _token_led_dot(self, left):
+        if not self._current_token() == 'star':
+            right = self._parse_dot_rhs(self.BINDING_POWER['dot'])
+            if left['type'] == 'subexpression':
+                left['children'].append(right)
+                return left
+            else:
+                return ast.subexpression([left, right])
+        else:
+            # We're creating a projection.
+            self._advance()
+            right = self._parse_projection_rhs(
+                self.BINDING_POWER['dot'])
+            return ast.value_projection(left, right)
+
+    def _token_led_pipe(self, left):
+        right = self._expression(self.BINDING_POWER['pipe'])
+        return ast.pipe(left, right)
+
+    def _token_led_or(self, left):
+        right = self._expression(self.BINDING_POWER['or'])
+        return ast.or_expression(left, right)
+
+    def _token_led_and(self, left):
+        right = self._expression(self.BINDING_POWER['and'])
+        return ast.and_expression(left, right)
+
+    def _token_led_lparen(self, left):
+        if left['type'] != 'field':
+            #  0 - first func arg or closing paren.
+            # -1 - '(' token
+            # -2 - invalid function "name".
+            prev_t = self._lookahead_token(-2)
+            raise exceptions.ParseError(
+                prev_t['start'], prev_t['value'], prev_t['type'],
+                "Invalid function name '%s'" % prev_t['value'])
+        name = left['value']
+        args = []
+        while not self._current_token() == 'rparen':
+            expression = self._expression()
+            if self._current_token() == 'comma':
+                self._match('comma')
+            args.append(expression)
+        self._match('rparen')
+        function_node = ast.function_expression(name, args)
+        return function_node
+
+    def _token_led_filter(self, left):
+        # Filters are projections.
+        condition = self._expression(0)
+        self._match('rbracket')
+        if self._current_token() == 'flatten':
+            right = ast.identity()
+        else:
+            right = self._parse_projection_rhs(self.BINDING_POWER['filter'])
+        return ast.filter_projection(left, right, condition)
+
+    def _token_led_eq(self, left):
+        return self._parse_comparator(left, 'eq')
+
+    def _token_led_ne(self, left):
+        return self._parse_comparator(left, 'ne')
+
+    def _token_led_gt(self, left):
+        return self._parse_comparator(left, 'gt')
+
+    def _token_led_gte(self, left):
+        return self._parse_comparator(left, 'gte')
+
+    def _token_led_lt(self, left):
+        return self._parse_comparator(left, 'lt')
+
+    def _token_led_lte(self, left):
+        return self._parse_comparator(left, 'lte')
+
+    def _token_led_flatten(self, left):
+        left = ast.flatten(left)
+        right = self._parse_projection_rhs(
+            self.BINDING_POWER['flatten'])
+        return ast.projection(left, right)
+
+    def _token_led_lbracket(self, left):
+        token = self._lookahead_token(0)
+        if token['type'] in ['number', 'colon']:
+            right = self._parse_index_expression()
+            if left['type'] == 'index_expression':
+                # Optimization: if the left node is an index expr,
+                # we can avoid creating another node and instead just add
+                # the right node as a child of the left.
+                left['children'].append(right)
+                return left
+            else:
+                return self._project_if_slice(left, right)
+        else:
+            # We have a projection
+            self._match('star')
+            self._match('rbracket')
+            right = self._parse_projection_rhs(self.BINDING_POWER['star'])
+            return ast.projection(left, right)
+
+    def _project_if_slice(self, left, right):
+        index_expr = ast.index_expression([left, right])
+        if right['type'] == 'slice':
+            return ast.projection(
+                index_expr,
+                self._parse_projection_rhs(self.BINDING_POWER['star']))
+        else:
+            return index_expr
+
+    def _parse_comparator(self, left, comparator):
+        right = self._expression(self.BINDING_POWER[comparator])
+        return ast.comparator(comparator, left, right)
+
+    def _parse_multi_select_list(self):
+        expressions = []
+        while True:
+            expression = self._expression()
+            expressions.append(expression)
+            if self._current_token() == 'rbracket':
+                break
+            else:
+                self._match('comma')
+        self._match('rbracket')
+        return ast.multi_select_list(expressions)
+
+    def _parse_multi_select_hash(self):
+        pairs = []
+        while True:
+            key_token = self._lookahead_token(0)
+            # Before getting the token value, verify it's
+            # an identifier.
+            self._match_multiple_tokens(
+                token_types=['quoted_identifier', 'unquoted_identifier'])
+            key_name = key_token['value']
+            self._match('colon')
+            value = self._expression(0)
+            node = ast.key_val_pair(key_name=key_name, node=value)
+            pairs.append(node)
+            if self._current_token() == 'comma':
+                self._match('comma')
+            elif self._current_token() == 'rbrace':
+                self._match('rbrace')
+                break
+        return ast.multi_select_dict(nodes=pairs)
+
+    def _parse_projection_rhs(self, binding_power):
+        # Parse the right hand side of the projection.
+        if self.BINDING_POWER[self._current_token()] < self._PROJECTION_STOP:
+            # BP of 10 are all the tokens that stop a projection.
+            right = ast.identity()
+        elif self._current_token() == 'lbracket':
+            right = self._expression(binding_power)
+        elif self._current_token() == 'filter':
+            right = self._expression(binding_power)
+        elif self._current_token() == 'dot':
+            self._match('dot')
+            right = self._parse_dot_rhs(binding_power)
+        else:
+            self._raise_parse_error_for_token(self._lookahead_token(0),
+                                              'syntax error')
+        return right
+
+    def _parse_dot_rhs(self, binding_power):
+        # From the grammar:
+        # expression '.' ( identifier /
+        #                  multi-select-list /
+        #                  multi-select-hash /
+        #                  function-expression /
+        #                  *
+        # In terms of tokens that means that after a '.',
+        # you can have:
+        lookahead = self._current_token()
+        # Common case "foo.bar", so first check for an identifier.
+        if lookahead in ['quoted_identifier', 'unquoted_identifier', 'star']:
+            return self._expression(binding_power)
+        elif lookahead == 'lbracket':
+            self._match('lbracket')
+            return self._parse_multi_select_list()
+        elif lookahead == 'lbrace':
+            self._match('lbrace')
+            return self._parse_multi_select_hash()
+        else:
+            t = self._lookahead_token(0)
+            allowed = ['quoted_identifier', 'unquoted_identifier',
+                       'lbracket', 'lbrace']
+            msg = (
+                "Expecting: %s, got: %s" % (allowed, t['type'])
+            )
+            self._raise_parse_error_for_token(t, msg)
+
+    def _error_nud_token(self, token):
+        if token['type'] == 'eof':
+            raise exceptions.IncompleteExpressionError(
+                token['start'], token['value'], token['type'])
+        self._raise_parse_error_for_token(token, 'invalid token')
+
+    def _error_led_token(self, token):
+        self._raise_parse_error_for_token(token, 'invalid token')
+
+    def _match(self, token_type=None):
+        # inline'd self._current_token()
+        if self._current_token() == token_type:
+            # inline'd self._advance()
+            self._advance()
+        else:
+            self._raise_parse_error_maybe_eof(
+                token_type, self._lookahead_token(0))
+
+    def _match_multiple_tokens(self, token_types):
+        if self._current_token() not in token_types:
+            self._raise_parse_error_maybe_eof(
+                token_types, self._lookahead_token(0))
+        self._advance()
+
+    def _advance(self):
+        self._index += 1
+
+    def _current_token(self):
+        return self._tokens[self._index]['type']
+
+    def _lookahead(self, number):
+        return self._tokens[self._index + number]['type']
+
+    def _lookahead_token(self, number):
+        return self._tokens[self._index + number]
+
+    def _raise_parse_error_for_token(self, token, reason):
+        lex_position = token['start']
+        actual_value = token['value']
+        actual_type = token['type']
+        raise exceptions.ParseError(lex_position, actual_value,
+                                    actual_type, reason)
+
+    def _raise_parse_error_maybe_eof(self, expected_type, token):
+        lex_position = token['start']
+        actual_value = token['value']
+        actual_type = token['type']
+        if actual_type == 'eof':
+            raise exceptions.IncompleteExpressionError(
+                lex_position, actual_value, actual_type)
+        message = 'Expecting: %s, got: %s' % (expected_type,
+                                              actual_type)
+        raise exceptions.ParseError(
+            lex_position, actual_value, actual_type, message)
+
+    def _free_cache_entries(self):
+        for key in random.sample(list(self._CACHE.keys()), int(self._MAX_SIZE / 2)):
+            self._CACHE.pop(key, None)
+
+    @classmethod
+    def purge(cls):
+        """Clear the expression compilation cache."""
+        cls._CACHE.clear()
+
+
+@with_repr_method
+class ParsedResult(object):
+    def __init__(self, expression, parsed):
+        self.expression = expression
+        self.parsed = parsed
+
+    def search(self, value, options=None):
+        interpreter = visitor.TreeInterpreter(options)
+        result = interpreter.visit(self.parsed, value)
+        return result
+
+    def _render_dot_file(self):
+        """Render the parsed AST as a dot file.
+
+        Note that this is marked as an internal method because
+        the AST is an implementation detail and is subject
+        to change.  This method can be used to help troubleshoot
+        or for development purposes, but is not considered part
+        of the public supported API.  Use at your own risk.
+
+        """
+        renderer = visitor.GraphvizVisitor()
+        contents = renderer.visit(self.parsed)
+        return contents
+
+    def __repr__(self):
+        return repr(self.parsed)