diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/celpy | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy')
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/__init__.py | 293 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/__main__.py | 465 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/adapter.py | 137 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/c7nlib.py | 1582 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/cel.lark | 179 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/celparser.py | 402 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/celtypes.py | 1495 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/evaluation.py | 2446 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/py.typed | 0 |
9 files changed, 6999 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/__init__.py b/.venv/lib/python3.12/site-packages/celpy/__init__.py new file mode 100644 index 00000000..0306530f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/__init__.py @@ -0,0 +1,293 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +Pure Python implementation of CEL. + +.. todo:: Consolidate __init__ and parser into one module? + +Visible interface to CEL. This exposes the :py:class:`Environment`, +the :py:class:`Evaluator` run-time, and the :py:mod:`celtypes` module +with Python types wrapped to be CEL compatible. + +Example +======= + +Here's an example with some details:: + + >>> import celpy + + # A list of type names and class bindings used to create an environment. + >>> types = [] + >>> env = celpy.Environment(types) + + # Parse the code to create the CEL AST. + >>> ast = env.compile("355. / 113.") + + # Use the AST and any overriding functions to create an executable program. + >>> functions = {} + >>> prgm = env.program(ast, functions) + + # Variable bindings. + >>> activation = {} + + # Final evaluation. + >>> try: + ... result = prgm.evaluate(activation) + ... error = None + ... except CELEvalError as ex: + ... result = None + ... error = ex.args[0] + + >>> result # doctest: +ELLIPSIS + DoubleType(3.14159...) + +Another Example +=============== + +See https://github.com/google/cel-go/blob/master/examples/simple_test.go + +The model Go we're sticking close to:: + + d := cel.Declarations(decls.NewVar("name", decls.String)) + env, err := cel.NewEnv(d) + if err != nil { + log.Fatalf("environment creation error: %v\\n", err) + } + ast, iss := env.Compile(`"Hello world! I'm " + name + "."`) + // Check iss for compilation errors. + if iss.Err() != nil { + log.Fatalln(iss.Err()) + } + prg, err := env.Program(ast) + if err != nil { + log.Fatalln(err) + } + out, _, err := prg.Eval(map[string]interface{}{ + "name": "CEL", + }) + if err != nil { + log.Fatalln(err) + } + fmt.Println(out) + // Output:Hello world! I'm CEL. + +Here's the Pythonic approach, using concept patterned after the Go implementation:: + + >>> from celpy import * + >>> decls = {"name": celtypes.StringType} + >>> env = Environment(annotations=decls) + >>> ast = env.compile('"Hello world! I\\'m " + name + "."') + >>> out = env.program(ast).evaluate({"name": "CEL"}) + >>> print(out) + Hello world! I'm CEL. + +""" +import json # noqa: F401 +import logging +import sys +from typing import Any, Dict, Optional, Type, cast + +import lark + +import celpy.celtypes +from celpy.adapter import (CELJSONDecoder, CELJSONEncoder, # noqa: F401 + json_to_cel) +from celpy.celparser import CELParseError, CELParser # noqa: F401 +from celpy.evaluation import (Activation, Annotation, # noqa: F401 + CELEvalError, CELFunction, Context, Evaluator, + Result, base_functions) + +# A parsed AST. +Expression = lark.Tree + + +class Runner: + """Abstract runner. + + Given an AST, this can evaluate the AST in the context of a specific activation + with any override function definitions. + + .. todo:: add type adapter and type provider registries. + """ + def __init__( + self, + environment: 'Environment', + ast: lark.Tree, + functions: Optional[Dict[str, CELFunction]] = None + ) -> None: + self.logger = logging.getLogger(self.__class__.__name__) + self.environment = environment + self.ast = ast + self.functions = functions + + def new_activation(self, context: Context) -> Activation: + """ + Builds the working activation from the environmental defaults. + """ + return self.environment.activation().nested_activation(vars=context) + + def evaluate(self, activation: Context) -> celpy.celtypes.Value: # pragma: no cover + raise NotImplementedError + + +class InterpretedRunner(Runner): + """ + Pure AST expression evaluator. Uses :py:class:`evaluation.Evaluator` class. + + Given an AST, this evauates the AST in the context of a specific activation. + + The returned value will be a celtypes type. + + Generally, this should raise an :exc:`CELEvalError` for most kinds of ordinary problems. + It may raise an :exc:`CELUnsupportedError` for future features. + + .. todo:: Refractor the Evaluator constructor from evaluation. + """ + def evaluate(self, context: Context) -> celpy.celtypes.Value: + e = Evaluator( + ast=self.ast, + activation=self.new_activation(context), + functions=self.functions + ) + value = e.evaluate() + return value + + +class CompiledRunner(Runner): + """ + Python compiled expression evaluator. Uses Python byte code and :py:func:`eval`. + + Given an AST, this evauates the AST in the context of a specific activation. + + Transform the AST into Python, uses :py:func:`compile` to create a code object. + Uses :py:func:`eval` to evaluate. + """ + def __init__( + self, + environment: 'Environment', + ast: lark.Tree, + functions: Optional[Dict[str, CELFunction]] = None + ) -> None: + super().__init__(environment, ast, functions) + # Transform AST to Python. + # compile() + # cache executable code object. + + def evaluate(self, activation: Context) -> celpy.celtypes.Value: + # eval() code object with activation as locals, and built-ins as gobals. + return super().evaluate(activation) + + +# TODO: Refactor classes into a separate "cel_protobuf" module. +# TODO: Becomes cel_protobuf.Int32Value +class Int32Value(celpy.celtypes.IntType): + def __new__( + cls: Type['Int32Value'], + value: Any = 0, + ) -> 'Int32Value': + """TODO: Check range. This seems to matter for protobuf.""" + if isinstance(value, celpy.celtypes.IntType): + return cast(Int32Value, super().__new__(cls, value)) + # TODO: elif other type conversions... + else: + convert = celpy.celtypes.int64(int) + return cast(Int32Value, super().__new__(cls, convert(value))) + + +# The "well-known" types in a google.protobuf package. +# We map these to CEl types instead of defining additional Protobuf Types. +# This approach bypasses some of the range constraints that are part of these types. +# It may also cause values to compare as equal when they were originally distinct types. +googleapis = { + 'google.protobuf.Int32Value': celpy.celtypes.IntType, + 'google.protobuf.UInt32Value': celpy.celtypes.UintType, + 'google.protobuf.Int64Value': celpy.celtypes.IntType, + 'google.protobuf.UInt64Value': celpy.celtypes.UintType, + 'google.protobuf.FloatValue': celpy.celtypes.DoubleType, + 'google.protobuf.DoubleValue': celpy.celtypes.DoubleType, + 'google.protobuf.BoolValue': celpy.celtypes.BoolType, + 'google.protobuf.BytesValue': celpy.celtypes.BytesType, + 'google.protobuf.StringValue': celpy.celtypes.StringType, + 'google.protobuf.ListValue': celpy.celtypes.ListType, + 'google.protobuf.Struct': celpy.celtypes.MessageType, +} + + +class Environment: + """Compiles CEL text to create an Expression object. + + From the Go implementation, there are things to work with the type annotations: + + - type adapters registry make other native types available for CEL. + + - type providers registry make ProtoBuf types available for CEL. + + .. todo:: Add adapter and provider registries to the Environment. + """ + def __init__( + self, + package: Optional[str] = None, + annotations: Optional[Dict[str, Annotation]] = None, + runner_class: Optional[Type[Runner]] = None + ) -> None: + """ + Create a new environment. + + This also increases the default recursion limit to handle the defined minimums for CEL. + + :param package: An optional package name used to resolve names in an Activation + :param annotations: Names with type annotations. + There are two flavors of names provided here. + + - Variable names based on :py:mod:``celtypes`` + + - Function names, using ``typing.Callable``. + :param runner_class: the class of Runner to use, either InterpretedRunner or CompiledRunner + """ + sys.setrecursionlimit(2500) + self.logger = logging.getLogger(self.__class__.__name__) + self.package: Optional[str] = package + self.annotations: Dict[str, Annotation] = annotations or {} + self.logger.info("Type Annotations %r", self.annotations) + self.runner_class: Type[Runner] = runner_class or InterpretedRunner + self.cel_parser = CELParser() + self.runnable: Runner + + # Fold in standard annotations. These (generally) define well-known protobuf types. + self.annotations.update(googleapis) + # We'd like to add 'type.googleapis.com/google' directly, but it seems to be an alias + # for 'google', the path after the '/' in the uri. + + def compile(self, text: str) -> Expression: + """Compile the CEL source. This can raise syntax error exceptions.""" + ast = self.cel_parser.parse(text) + return ast + + def program( + self, + expr: lark.Tree, + functions: Optional[Dict[str, CELFunction]] = None + ) -> Runner: + """Transforms the AST into an executable runner.""" + self.logger.info("Package %r", self.package) + runner_class = self.runner_class + self.runnable = runner_class(self, expr, functions) + return self.runnable + + def activation(self) -> Activation: + """Returns a base activation""" + activation = Activation(package=self.package, annotations=self.annotations) + return activation diff --git a/.venv/lib/python3.12/site-packages/celpy/__main__.py b/.venv/lib/python3.12/site-packages/celpy/__main__.py new file mode 100644 index 00000000..c9483173 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/__main__.py @@ -0,0 +1,465 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +Pure Python implementation of CEL. + +This provides a few jq-like, bc-like, and shell expr-like features. + +- ``jq`` uses ``.`` to refer the current document. By setting a package + name of ``"jq"`` and placing the JSON object in the package, we achieve + similar syntax. + +- ``bc`` offers complex function definitions and other programming support. + CEL can only evaluate a few bc-like expressions. + +- This does everything ``expr`` does, but the syntax is slightly different. + The output of comparisons -- by default -- is boolean, where ``expr`` is an integer 1 or 0. + Use ``-f 'd'`` to see decimal output instead of Boolean text values. + +- This does some of what ``test`` does, without a lot of the sophisticated + file system data gathering. + Use ``-b`` to set the exit status code from a Boolean result. + +TODO: This can also have a REPL, as well as process CSV files. + +SYNOPSIS +======== + +:: + + python -m celpy [--arg name:type=value ...] [--null-input] expr + +Options: + +:--arg: + Provides argument names, types and optional values. + If the value is not provided, the name is expected to be an environment + variable, and the value of the environment variable is converted and used. + +:--null-input: + Normally, JSON documents are read from stdin in ndjson format. If no JSON documents are + provided, the ``--null-input`` option skips trying to read from stdin. + +:expr: + A CEL expression to evaluate. + +JSON documents are read from stdin in NDJSON format (http://jsonlines.org/, http://ndjson.org/). +For each JSON document, the expression is evaluated with the document in a default +package. This allows `.name` to pick items from the document. + +By default, the output is JSON serialized. This means strings will be JSON-ified and have quotes. + +If a ``--format`` option is provided, this is applied to the resulting object; this can be +used to strip quotes, or limit precision on double objects, or convert numbers to hexadecimal. + +Arguments, Types, and Namespaces +================================ + +CEL objects rely on the celtypes definitions. + +Because of the close association between CEL and protobuf, some well-known protobuf types +are also supported. + +.. todo:: CLI type environment + + Permit name.name:type=value to create namespace bindings. + +Further, type providers can be bound to CEL. This means an extended CEL +may have additional types beyond those defined by the :py:class:`Activation` class. + +""" + +import argparse +import ast +import cmd +import json +import logging +import os +import re +import sys +from typing import Any, Callable, Dict, List, Optional, Tuple, cast + +from celpy import Environment, Runner, celtypes +from celpy.adapter import CELJSONDecoder, CELJSONEncoder +from celpy.celparser import CELParseError +from celpy.evaluation import Annotation, CELEvalError, Result + +logger = logging.getLogger("celpy") + + +# For argument parsing purposes. +# Note the reliance on `ast.literal_eval` for ListType and MapType conversions. +# Other types convert strings directly. These types need some help. +CLI_ARG_TYPES: Dict[str, Annotation] = { + "int": + celtypes.IntType, + "uint": + celtypes.UintType, + "double": + celtypes.DoubleType, + "bool": + celtypes.BoolType, + "string": + celtypes.StringType, + "bytes": + celtypes.BytesType, + "list": + cast(Callable[..., celtypes.Value], lambda arg: celtypes.ListType(ast.literal_eval(arg))), + "map": + cast(Callable[..., celtypes.Value], lambda arg: celtypes.MapType(ast.literal_eval(arg))), + "null_type": + cast(Callable[..., celtypes.Value], lambda arg: None), + "single_duration": + celtypes.DurationType, + "single_timestamp": + celtypes.TimestampType, + + "int64_value": celtypes.IntType, + "uint64_value": celtypes.UintType, + "double_value": celtypes.DoubleType, + "bool_value": celtypes.BoolType, + "string_value": celtypes.StringType, + "bytes_value": celtypes.BytesType, + "number_value": celtypes.DoubleType, # Ambiguous; can somtimes be integer. + "null_value": cast(Callable[..., celtypes.Value], lambda arg: None), +} + + +def arg_type_value(text: str) -> Tuple[str, Annotation, celtypes.Value]: + """ + Decompose ``-a name:type=value`` argument into a useful triple. + + Also accept ``-a name:type``. This will find ``name`` in the environment and convert to the + requested type. + + Also accepts ``-a name``. This will find ``name`` in the environment and treat it as a string. + + Currently, names do not reflect package naming. An environment can be a package, + and the activation can include variables that are also part of the package. + This is not supported via the CLI. + + Types can be celtypes class names or TYPE_NAME or PROTOBUF_TYPE + + :: + + TYPE_NAME : "int64_value" | "null_value" | "uint64_value" | "double_value" + | "bool_value" | "string_value" | "bytes_value" | "number_value" + + PROTOBUF_TYPE : "single_int64" | "single_int32" | "single_uint64" | "single_uint32" + | "single_sint64" | "single_sint32" | "single_fixed64" | "single_fixed32" + | "single_sfixed32" | "single_sfixed64" | "single_float" | "single_double" + | "single_bool" | "single_string" | "single_bytes" + | "single_duration" | "single_timestamp" + + .. todo:: type names can include `.` to support namespacing for protobuf support. + + :param text: Argument value + :return: Tuple with name, annotation, and resulting object. + """ + arg_pattern = re.compile( + r"^([_a-zA-Z][_a-zA-Z0-9]*)(?::([_a-zA-Z][_a-zA-Z0-9]*))?(?:=(.*))?$" + ) + match = arg_pattern.match(text) + if match is None: + raise argparse.ArgumentTypeError( + f"arg {text} not 'var=string', 'var:type=value', or `var:type") + name, type_name, value_text = match.groups() + if value_text is None: + value_text = os.environ.get(name) + type_definition: Annotation # CELType or a conversion function + value: celtypes.Value # Specific value. + if type_name: + try: + type_definition = CLI_ARG_TYPES[type_name] + value = cast( + celtypes.Value, + type_definition(value_text) # type: ignore[arg-type, call-arg] + ) + except KeyError: + raise argparse.ArgumentTypeError(f"arg {text} type name not in {list(CLI_ARG_TYPES)}") + except ValueError: + raise argparse.ArgumentTypeError(f"arg {text} value invalid for the supplied type") + else: + value = celtypes.StringType(value_text or "") + type_definition = celtypes.StringType + return name, type_definition, value + + +def get_options(argv: Optional[List[str]] = None) -> argparse.Namespace: + """Parses command-line arguments. + """ + parser = argparse.ArgumentParser(prog="celpy", description="Pure Python CEL") + parser.add_argument( + "-v", "--verbose", default=0, action='count') + + # Inputs + parser.add_argument( + "-a", "--arg", action='append', type=arg_type_value, + help="Variables to set; -a name:type=value, or -a name=value for strings, " + "or -a name to read an environment variable" + ) + parser.add_argument( + "-n", "--null-input", dest='null_input', default=False, action='store_true', + help="Avoid reading Newline-Delimited JSON documents from stdin" + ) + parser.add_argument( + "-s", "--slurp", default=False, action="store_true", + help="Slurp a single, multiple JSON document from stdin" + ) + parser.add_argument( + "-i", "--interactive", default=False, action="store_true", + help="Interactive REPL" + ) + + # JSON handling + parser.add_argument( + "--json-package", "-p", metavar="NAME", dest="package", default=None, action="store", + help="Each JSON input is a CEL package, allowing .name to work" + ) + parser.add_argument( + "--json-document", "-d", metavar="NAME", dest="document", default=None, action="store", + help="Each JSON input is a variable, allowing name.map(x, x*2) to work" + ) + + # Outputs and Status + parser.add_argument( + "-b", "--boolean", default=False, action='store_true', + help="If the result is True, the exit status is 0, for False, it's 1, otherwise 2" + ) + parser.add_argument( + "-f", "--format", default=None, action='store', + help=( + "Use Python formating instead of JSON conversion of results; " + "Example '.6f' to format a DoubleType result" + ) + ) + + # The expression + parser.add_argument("expr", nargs='?') + + options = parser.parse_args(argv) + if options.package and options.document: + parser.error("Either use --json-package or --json-document, not both") + if not options.package and not options.document: + options.package = "jq" + if options.interactive and options.expr: + parser.error("Interactive mode and an expression provided") + if not options.interactive and not options.expr: + parser.error("No expression provided") + return options + + +class CEL_REPL(cmd.Cmd): + prompt = "CEL> " + intro = "Enter an expression to have it evaluated." + logger = logging.getLogger("REPL") + + def cel_eval(self, text: str) -> celtypes.Value: + try: + expr = self.env.compile(text) + prgm = self.env.program(expr) + return prgm.evaluate(self.state) + except CELParseError as ex: + print(self.env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr) + raise + + def preloop(self) -> None: + self.env = Environment() + self.state: Dict[str, celtypes.Value] = {} + + def do_set(self, args: str) -> bool: + """Set variable expression + + Evaluates the expression, saves the result as the given variable in the current activation. + """ + name, space, args = args.partition(' ') + try: + value: celtypes.Value = self.cel_eval(args) + print(value) + self.state[name] = value + except Exception as ex: + self.logger.error(ex) + return False + + def do_show(self, args: str) -> bool: + """Shows all variables in the current activation.""" + print(self.state) + return False + + def do_quit(self, args: str) -> bool: + """Quits from the REPL.""" + return True + + do_exit = do_quit + do_bye = do_quit + + def default(self, args: str) -> None: + """Evaluate an expression.""" + try: + value = self.cel_eval(args) + print(value) + except Exception as ex: + self.logger.error(ex) + + +def process_json_doc( + display: Callable[[Result], None], + prgm: Runner, + activation: Dict[str, Any], + variable: str, + document: str, + boolean_to_status: bool = False) -> int: + """ + Process a single JSON document. Either one line of an NDJSON stream + or the only document in slurp mode. We assign it to the variable "jq". + This variable can be the package name, allowing ``.name``) to work. + Or. It can be left as a variable, allowing ``jq`` and ``jq.map(x, x*2)`` to work. + + Returns status code 0 for success, 3 for failure. + """ + try: + activation[variable] = json.loads(document, cls=CELJSONDecoder) + result = prgm.evaluate(activation) + display(result) + if (boolean_to_status and isinstance(result, (celtypes.BoolType, bool))): + return 0 if result else 1 + return 0 + except CELEvalError as ex: + # ``jq`` KeyError problems result in ``None``. + # Other errors should, perhaps, be more visible. + logger.debug("Encountered %s on document %r", ex, document) + display(None) + return 0 + except json.decoder.JSONDecodeError as ex: + logger.error("%s on document %r", ex.args[0], document) + # print(f"{ex.args[0]} in {document!r}", file=sys.stderr) + return 3 + + +def main(argv: Optional[List[str]] = None) -> int: + """ + Given options from the command-line, execute the CEL expression. + + With --null-input option, only --arg and expr matter. + + Without --null-input, JSON documents are read from STDIN, following ndjson format. + + With the --slurp option, it reads one JSON from stdin, spread over multiple lines. + + If "--json-package" is used, each JSON document becomes a package, and + top-level dictionary keys become valid ``.name`` expressions. + Otherwise, "--json-object" is the default, and each JSON document + is assigned to a variable. The default name is "jq" to allow expressions + that are similar to ``jq`` but with a "jq" prefix. + """ + options = get_options(argv) + if options.verbose == 1: + logging.getLogger().setLevel(logging.INFO) + elif options.verbose > 1: + logging.getLogger().setLevel(logging.DEBUG) + logger.debug(options) + + if options.interactive: + repl = CEL_REPL() + repl.cmdloop() + return 0 + + if options.format: + def output_display(result: Result) -> None: + print('{0:{format}}'.format(result, format=options.format)) + else: + def output_display(result: Result) -> None: + print(json.dumps(result, cls=CELJSONEncoder)) + + logger.info("Expr: %r", options.expr) + + if options.arg: + logger.info("Args: %r", options.arg) + + annotations: Optional[Dict[str, Annotation]] + if options.arg: + annotations = { + name: type for name, type, value in options.arg + } + else: + annotations = None + + # If we're creating a named JSON document, we don't provide a default package. + # If we're usinga JSON document to populate a package, we provide the given name. + env = Environment( + package=None if options.null_input else options.package, + annotations=annotations, + ) + try: + expr = env.compile(options.expr) + prgm = env.program(expr) + except CELParseError as ex: + print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr) + return 1 + + if options.arg: + activation = { + name: value for name, type, value in options.arg + } + else: + activation = {} + + if options.null_input: + # Don't read stdin, evaluate with only the activation context. + try: + result = prgm.evaluate(activation) + if options.boolean: + if isinstance(result, (celtypes.BoolType, bool)): + summary = 0 if result else 1 + else: + logger.warning("Expected celtypes.BoolType, got %s = %r", type(result), result) + summary = 2 + else: + output_display(result) + summary = 0 + except CELEvalError as ex: + print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr) + summary = 2 + + elif options.slurp: + # If slurp, one big document, part of the "jq" package in the activation context. + document = sys.stdin.read() + summary = process_json_doc( + output_display, prgm, activation, options.document or options.package, document, + options.boolean + ) + + else: + # NDJSON format: each line is a JSON doc. We repackage the doc into celtypes objects. + # Each document is in the "jq" package in the activation context. + summary = 0 + for document in sys.stdin: + summary = max( + summary, + process_json_doc( + output_display, prgm, activation, options.document or options.package, document, + options.boolean + ) + ) + + return summary + + +if __name__ == "__main__": # pragma: no cover + logging.basicConfig(level=logging.WARNING) + exit = main(sys.argv[1:]) + logging.shutdown() + sys.exit(exit) diff --git a/.venv/lib/python3.12/site-packages/celpy/adapter.py b/.venv/lib/python3.12/site-packages/celpy/adapter.py new file mode 100644 index 00000000..572b65ce --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/adapter.py @@ -0,0 +1,137 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +""" +Type Adapter to convert Python-native types into CEL structures. + +Currently, Atomic Python objects have direct use of types in :mod:`celpy.celtypes`. + +Non-Atomic Python objects are characterized by JSON and Protobuf +objects. This module has functions to convert JSON objects to CEL. + +The protobuf decoder is TBD. + +A more sophisticated type injection capability may be needed to permit +additional types or extensions to :mod:`celpy.celtypes`. +""" +import base64 +import datetime +import json +from typing import Any, Dict, List, Union, cast + +from celpy import celtypes + +JSON = Union[Dict[str, Any], List[Any], bool, float, int, str, None] + + +class CELJSONEncoder(json.JSONEncoder): + """ + An Encoder to export CEL objects as JSON text. + + This is **not** a reversible transformation. Some things are coerced to strings + without any more detailed type marker. + Specifically timestamps, durations, and bytes. + """ + @staticmethod + def to_python( + cel_object: celtypes.Value) -> Union[celtypes.Value, List[Any], Dict[Any, Any], bool]: + """Recursive walk through the CEL object, replacing BoolType with native bool instances. + This lets the :py:mod:`json` module correctly represent the obects + with JSON ``true`` and ``false``. + + This will also replace ListType and MapType with native ``list`` and ``dict``. + All other CEL objects will be left intact. This creates an intermediate hybrid + beast that's not quite a :py:class:`celtypes.Value` because a few things have been replaced. + """ + if isinstance(cel_object, celtypes.BoolType): + return True if cel_object else False + elif isinstance(cel_object, celtypes.ListType): + return [CELJSONEncoder.to_python(item) for item in cel_object] + elif isinstance(cel_object, celtypes.MapType): + return { + CELJSONEncoder.to_python(key): CELJSONEncoder.to_python(value) + for key, value in cel_object.items() + } + else: + return cel_object + + def encode(self, cel_object: celtypes.Value) -> str: + """ + Override built-in encode to create proper Python :py:class:`bool` objects. + """ + return super().encode(CELJSONEncoder.to_python(cel_object)) + + def default(self, cel_object: celtypes.Value) -> JSON: + if isinstance(cel_object, celtypes.TimestampType): + return str(cel_object) + elif isinstance(cel_object, celtypes.DurationType): + return str(cel_object) + elif isinstance(cel_object, celtypes.BytesType): + return base64.b64encode(cel_object).decode("ASCII") + else: + return cast(JSON, super().default(cel_object)) + + +class CELJSONDecoder(json.JSONDecoder): + """ + An Encoder to import CEL objects from JSON to the extent possible. + + This does not handle non-JSON types in any form. Coercion from string + to TimestampType or DurationType or BytesType is handled by celtype + constructors. + """ + def decode(self, source: str, _w: Any = None) -> Any: + raw_json = super().decode(source) + return json_to_cel(raw_json) + + +def json_to_cel(document: JSON) -> celtypes.Value: + """Convert parsed JSON object from Python to CEL to the extent possible. + + It's difficult to distinguish strings which should be timestamps or durations. + + :: + + >>> from pprint import pprint + >>> from celpy.adapter import json_to_cel + >>> doc = json.loads('["str", 42, 3.14, null, true, {"hello": "world"}]') + >>> cel = json_to_cel(doc) + >>> pprint(cel) + ListType([StringType('str'), IntType(42), DoubleType(3.14), None, BoolType(True), \ +MapType({StringType('hello'): StringType('world')})]) + """ + if isinstance(document, bool): + return celtypes.BoolType(document) + elif isinstance(document, float): + return celtypes.DoubleType(document) + elif isinstance(document, int): + return celtypes.IntType(document) + elif isinstance(document, str): + return celtypes.StringType(document) + elif document is None: + return None + elif isinstance(document, (tuple, List)): + return celtypes.ListType( + [json_to_cel(item) for item in document] + ) + elif isinstance(document, Dict): + return celtypes.MapType( + {json_to_cel(key): json_to_cel(value) for key, value in document.items()} + ) + elif isinstance(document, datetime.datetime): + return celtypes.TimestampType(document) + elif isinstance(document, datetime.timedelta): + return celtypes.DurationType(document) + else: + raise ValueError(f"unexpected type {type(document)} in JSON structure {document!r}") diff --git a/.venv/lib/python3.12/site-packages/celpy/c7nlib.py b/.venv/lib/python3.12/site-packages/celpy/c7nlib.py new file mode 100644 index 00000000..4ec9075e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/c7nlib.py @@ -0,0 +1,1582 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +Functions for C7N features when evaluating CEL expressions. + +These functions provide a mapping between C7N features and CEL. + +These functions are exposed by the global ``FUNCTIONS`` dictionary that is provided +to the CEL evaluation run-time to provide necessary C7N features. + +The functions rely on implementation details in the ``CELFilter`` class. + +The API +======= + +C7N uses CEL and the :py:mod:`c7nlib` module as follows:: + + class CELFilter(c7n.filters.core.Filter): # See below for the long list of mixins. + decls = { + "resource": celpy.celtypes.MapType, + "now": celpy.celtypes.TimestampType, + "event": celpy.celtypes.MapType, + } + decls.update(celpy.c7nlib.DECLARATIONS) + + def __init__(self, expr: str) -> None: + self.expr = expr + + def validate(self) -> None: + cel_env = celpy.Environment( + annotations=self.decls, + runner_class=c7nlib.C7N_Interpreted_Runner) + cel_ast = cel_env.compile(self.expr) + self.pgm = cel_env.program(cel_ast, functions=celpy.c7nlib.FUNCTIONS) + + def process(self, + resources: Iterable[celpy.celtypes.MapType]) -> Iterator[celpy.celtypes.MapType]: + now = datetime.datetime.utcnow() + for resource in resources: + with C7NContext(filter=the_filter): + cel_activation = { + "resource": celpy.json_to_cel(resource), + "now": celpy.celtypes.TimestampType(now), + } + if self.pgm.evaluate(cel_activation): + yield resource + +This isn't the whole story, this is the starting point. + +This library of functions is bound into the program that's built from the AST. + +Several objects are required in activation for use by the CEL expression + +- ``resource``. The JSON document describing the cloud resource. + +- ``now.`` The current timestamp. + +- Optionally, ``event`` may have an AWS CloudWatch Event. + + +The type: value Features +======================== + +The core value features of C7N require a number of CEL extensions. + +- :func:`glob(string, pattern)` uses Python fnmatch rules. This implements ``op: glob``. + +- :func:`difference(list, list)` creates intermediate sets and computes the difference + as a boolean value. Any difference is True. This implements ``op: difference``. + +- :func:`intersect(list, list)` creats intermediate sets and computes the intersection + as a boolean value. Any interection is True. This implements ``op: intersect``. + +- :func:`normalize(string)` supports normalized comparison between strings. + In this case, it means lower cased and trimmed. This implements ``value_type: normalize``. + +- :func:`net.cidr_contains` checks to see if a given CIDR block contains a specific + address. See https://www.openpolicyagent.org/docs/latest/policy-reference/#net. + +- :func:`net.cidr_size` extracts the prefix length of a parsed CIDR block. + +- :func:`version` uses ``disutils.version.LooseVersion`` to compare version strings. + +- :func:`resource_count` function. This is TBD. + +The type: value_from features +============================== + +This relies on ``value_from()`` and ``jmes_path_map()`` functions + +In context, it looks like this:: + + value_from("s3://c7n-resources/exemptions.json", "json") + .jmes_path_map('exemptions.ec2.rehydration.["IamInstanceProfile.Arn"][].*[].*[]') + .contains(resource["IamInstanceProfile"]["Arn"]) + +The ``value_from()`` function reads values from a given URI. + +- A full URI for an S3 bucket. + +- A full URI for a server that supports HTTPS GET requests. + +If a format is given, this is used, otherwise it's based on the +suffix of the path. + +The ``jmes_path_map()`` function compiles and applies a JMESPath +expression against each item in the collection to create a +new collection. To an extent, this repeats functionality +from the ``map()`` macro. + +Additional Functions +==================== + +A number of C7N subclasses of ``Filter`` provide additional features. There are +at least 70-odd functions that are expressed or implied by these filters. + +Because the CEL expressions are always part of a ``CELFilter``, all of these +additional C7N features need to be transformed into "mixins" that are implemented +in two places. The function is part of the legacy subclass of ``Filter``, +and the function is also part of ``CELFilter``. + +:: + + class InstanceImageMixin: + # from :py:class:`InstanceImageBase` refactoring + def get_instance_image(self): + pass + + class RelatedResourceMixin: + # from :py:class:`RelatedResourceFilter` mixin + def get_related_ids(self): + pass + + def get_related(self): + pass + + class CredentialReportMixin: + # from :py:class:`c7n.resources.iam.CredentialReport` filter. + def get_credential_report(self): + pass + + class ResourceKmsKeyAliasMixin: + # from :py:class:`c7n.resources.kms.ResourceKmsKeyAlias` + def get_matching_aliases(self, resource): + pass + + class CrossAccountAccessMixin: + # from :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + def get_accounts(self, resource): + pass + def get_vpcs(self, resource): + pass + def get_vpces(self, resource): + pass + def get_orgids(self, resource): + pass + # from :py:class:`c7n.resources.secretsmanager.CrossAccountAccessFilter` + def get_resource_policy(self, resource): + pass + + class SNSCrossAccountMixin: + # from :py:class:`c7n.resources.sns.SNSCrossAccount` + def get_endpoints(self, resource): + pass + def get_protocols(self, resource): + pass + + class ImagesUnusedMixin: + # from :py:class:`c7n.resources.ami.ImageUnusedFilter` + def _pull_ec2_images(self, resource): + pass + def _pull_asg_images(self, resource): + pass + + class SnapshotUnusedMixin: + # from :py:class:`c7n.resources.ebs.SnapshotUnusedFilter` + def _pull_asg_snapshots(self, resource): + pass + def _pull_ami_snapshots(self, resource): + pass + + class IamRoleUsageMixin: + # from :py:class:`c7n.resources.iam.IamRoleUsage` + def service_role_usage(self, resource): + pass + def instance_profile_usage(self, resource): + pass + + class SGUsageMixin: + # from :py:class:`c7n.resources.vpc.SGUsage` + def scan_groups(self, resource): + pass + + class IsShieldProtectedMixin: + # from :py:mod:`c7n.resources.shield` + def get_type_protections(self, resource): + pass + + class ShieldEnabledMixin: + # from :py:class:`c7n.resources.account.ShieldEnabled` + def account_shield_subscriptions(self, resource): + pass + + class CELFilter( + InstanceImageMixin, RelatedResourceMixin, CredentialReportMixin, + ResourceKmsKeyAliasMixin, CrossAccountAccessMixin, SNSCrossAccountMixin, + ImagesUnusedMixin, SnapshotUnusedMixin, IamRoleUsageMixin, SGUsageMixin, + Filter, + ): + '''Container for functions used by c7nlib to expose data to CEL''' + def __init__(self, data, manager) -> None: + super().__init__(data, manager) + assert data["type"].lower() == "cel" + self.expr = data["expr"] + self.parser = c7n.filters.offhours.ScheduleParser() + + def validate(self): + pass # See above example + + def process(self, resources): + pass # See above example + +This is not the complete list. See the ``tests/test_c7nlib.py`` for the ``celfilter_instance`` +fixture which contains **all** of the functions required. + +C7N Context Object +================== + +A number of the functions require access to C7N features that are not simply part +of the resource being filtered. There are two alternative ways to handle this dependency: + +- A global C7N context object that has the current ``CELFilter`` providing + access to C7N internals. + +- A ``C7N`` argument to the functions that need C7N access. + This would be provided in the activation context for CEL. + +To keep the library functions looking simple, the module global ``C7N`` is used. +This avoids introducing a non-CEL parameter to the c7nlib functions. + +The ``C7N`` context object contains the following attributes: + +- ``filter``. The original C7N ``Filter`` object. This provides access to the + resource manager. It can be used to manage supplemental + queries using C7N caches and other resource management. + +This is set by the :py:class:`C7NContext` prior to CEL evaluation. + +Name Resolution +=============== + +Note that names are **not** resolved via a lookup in the program object, +an instance of the :py:class:`celpy.Runner` class. To keep these functions +simple, the runner is not part of the run-time, and name resolution +will appear to be "hard-wrired" among these functions. + +This is rarely an issue, since most of these functions are independent. +The :func:`value_from` function relies on :func:`text_from` and :func:`parse_text`. +Changing either of these functions with an override won't modify the behavior +of :func:`value_from`. +""" +import csv +import fnmatch +import io +import ipaddress +import json +import logging +import os.path +import sys +import urllib.request +import zlib +from contextlib import closing +from packaging.version import Version +from types import TracebackType +from typing import (Any, Callable, Dict, Iterator, List, Optional, Type, Union, + cast) + +import dateutil +import jmespath # type: ignore [import-untyped] + +from celpy import InterpretedRunner, celtypes +from celpy.adapter import json_to_cel +from celpy.evaluation import Annotation, Context, Evaluator + +logger = logging.getLogger(__name__) + + +class C7NContext: + """ + Saves current C7N filter for use by functions in this module. + + This is essential for making C7N filter available to *some* of these functions. + + :: + + with C7NContext(filter): + cel_prgm.evaluate(cel_activation) + """ + + def __init__(self, filter: Any) -> None: + self.filter = filter + + def __repr__(self) -> str: # pragma: no cover + return f"{self.__class__.__name__}(filter={self.filter!r})" + + def __enter__(self) -> None: + global C7N + C7N = self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + global C7N + C7N = cast("C7NContext", None) + return + + +# An object used for access to the C7N filter. +# A module global makes the interface functions much simpler. +# They can rely on `C7N.filter` providing the current `CELFilter` instance. +C7N = cast("C7NContext", None) + + +def key(source: celtypes.ListType, target: celtypes.StringType) -> celtypes.Value: + """ + The C7N shorthand ``tag:Name`` doesn't translate well to CEL. It extracts a single value + from a sequence of objects with a ``{"Key": x, "Value": y}`` structure; specifically, + the value for ``y`` when ``x == "Name"``. + + This function locate a particular "Key": target within a list of {"Key": x, "Value", y} items, + returning the y value if one is found, null otherwise. + + In effect, the ``key()`` function:: + + resource["Tags"].key("Name")["Value"] + + is somewhat like:: + + resource["Tags"].filter(x, x["Key"] == "Name")[0] + + But the ``key()`` function doesn't raise an exception if the key is not found, + instead it returns None. + + We might want to generalize this into a ``first()`` reduction macro. + ``resource["Tags"].first(x, x["Key"] == "Name" ? x["Value"] : null, null)`` + This macro returns the first non-null value or the default (which can be ``null``.) + """ + key = celtypes.StringType("Key") + value = celtypes.StringType("Value") + matches: Iterator[celtypes.Value] = ( + item + for item in source + if cast(celtypes.StringType, cast(celtypes.MapType, item).get(key)) + == target # noqa: W503 + ) + try: + return cast(celtypes.MapType, next(matches)).get(value) + except StopIteration: + return None + + +def glob(text: celtypes.StringType, pattern: celtypes.StringType) -> celtypes.BoolType: + """Compare a string with a pattern. + + While ``"*.py".glob(some_string)`` seems logical because the pattern the more persistent object, + this seems to cause confusion. + + We use ``some_string.glob("*.py")`` to express a regex-like rule. This parallels the CEL + `.matches()` method. + + We also support ``glob(some_string, "*.py")``. + """ + return celtypes.BoolType(fnmatch.fnmatch(text, pattern)) + + +def difference(left: celtypes.ListType, right: celtypes.ListType) -> celtypes.BoolType: + """ + Compute the difference between two lists. This is ordered set difference: left - right. + It's true if the result is non-empty: there is an item in the left, not present in the right. + It's false if the result is empty: the lists are the same. + """ + return celtypes.BoolType(bool(set(left) - set(right))) + + +def intersect(left: celtypes.ListType, right: celtypes.ListType) -> celtypes.BoolType: + """ + Compute the intersection between two lists. + It's true if the result is non-empty: there is an item in both lists. + It's false if the result is empty: there is no common item between the lists. + """ + return celtypes.BoolType(bool(set(left) & set(right))) + + +def normalize(string: celtypes.StringType) -> celtypes.StringType: + """ + Normalize a string. + """ + return celtypes.StringType(string.lower().strip()) + + +def unique_size(collection: celtypes.ListType) -> celtypes.IntType: + """ + Unique size of a list + """ + return celtypes.IntType(len(set(collection))) + + +class IPv4Network(ipaddress.IPv4Network): + + # Override for net 2 net containment comparison + def __contains__(self, other): # type: ignore[no-untyped-def] + if other is None: + return False + if isinstance(other, ipaddress._BaseNetwork): + return self.supernet_of(other) # type: ignore[no-untyped-call] + return super(IPv4Network, self).__contains__(other) + + if sys.version_info.major == 3 and sys.version_info.minor <= 6: # pragma: no cover + + @staticmethod + def _is_subnet_of(a, b): # type: ignore[no-untyped-def] + try: + # Always false if one is v4 and the other is v6. + if a._version != b._version: + raise TypeError(f"{a} and {b} are not of the same version") + return ( + b.network_address <= a.network_address + and b.broadcast_address >= a.broadcast_address # noqa: W503 + ) + except AttributeError: + raise TypeError( + f"Unable to test subnet containment " f"between {a} and {b}" + ) + + def supernet_of(self, other): # type: ignore[no-untyped-def] + """Return True if this network is a supernet of other.""" + return self._is_subnet_of(other, self) # type: ignore[no-untyped-call] + + +CIDR = Union[None, IPv4Network, ipaddress.IPv4Address] +CIDR_Class = Union[Type[IPv4Network], Callable[..., ipaddress.IPv4Address]] + + +def parse_cidr(value: str) -> CIDR: + """ + Process cidr ranges. + + This is a union of types outside CEL. + + It appears to be Union[None, IPv4Network, ipaddress.IPv4Address] + """ + klass: CIDR_Class = IPv4Network + if "/" not in value: + klass = ipaddress.ip_address # type: ignore[assignment] + v: CIDR + try: + v = klass(value) + except (ipaddress.AddressValueError, ValueError): + v = None + return v + + +def size_parse_cidr(value: celtypes.StringType,) -> Optional[celtypes.IntType]: + """CIDR prefixlen value""" + cidr = parse_cidr(value) + if cidr and isinstance(cidr, IPv4Network): + return celtypes.IntType(cidr.prefixlen) + else: + return None + + +class ComparableVersion(Version): + """ + The old LooseVersion could fail on comparing present strings, used + in the value as shorthand for certain options. + + The new Version doesn't fail as easily. + """ + + def __eq__(self, other: object) -> bool: + try: + return super(ComparableVersion, self).__eq__(other) + except TypeError: # pragma: no cover + return False + + +def version( + value: celtypes.StringType, +) -> celtypes.Value: # actually, a ComparableVersion + return cast(celtypes.Value, ComparableVersion(value)) + + +def present(value: celtypes.StringType,) -> celtypes.Value: + return cast(celtypes.Value, bool(value)) + + +def absent(value: celtypes.StringType,) -> celtypes.Value: + return cast(celtypes.Value, not bool(value)) + + +def text_from(url: celtypes.StringType,) -> celtypes.Value: + """ + Read raw text from a URL. This can be expanded to accept S3 or other URL's. + """ + req = urllib.request.Request(url, headers={"Accept-Encoding": "gzip"}) + raw_data: str + with closing(urllib.request.urlopen(req)) as response: + if response.info().get("Content-Encoding") == "gzip": + raw_data = zlib.decompress(response.read(), zlib.MAX_WBITS | 32).decode( + "utf8" + ) + else: + raw_data = response.read().decode("utf-8") + return celtypes.StringType(raw_data) + + +def parse_text( + source_text: celtypes.StringType, format: celtypes.StringType +) -> celtypes.Value: + """ + Parse raw text using a given format. + """ + if format == "json": + return json_to_cel(json.loads(source_text)) + elif format == "txt": + return celtypes.ListType( + [celtypes.StringType(s.rstrip()) for s in source_text.splitlines()] + ) + elif format in ("ldjson", "ndjson", "jsonl"): + return celtypes.ListType( + [json_to_cel(json.loads(s)) for s in source_text.splitlines()] + ) + elif format == "csv": + return celtypes.ListType( + [json_to_cel(row) for row in csv.reader(io.StringIO(source_text))] + ) + elif format == "csv2dict": + return celtypes.ListType( + [json_to_cel(row) for row in csv.DictReader(io.StringIO(source_text))] + ) + else: + raise ValueError(f"Unsupported format: {format!r}") # pragma: no cover + + +def value_from( + url: celtypes.StringType, format: Optional[celtypes.StringType] = None, +) -> celtypes.Value: + """ + Read values from a URL. + + First, do :func:`text_from` to read the source. + Then, do :func:`parse_text` to parse the source, if needed. + + This makes the format optional, and deduces it from the URL's path information. + + C7N will generally replace this with a function + that leverages a more sophisticated :class:`c7n.resolver.ValuesFrom`. + """ + supported_formats = ("json", "ndjson", "ldjson", "jsonl", "txt", "csv", "csv2dict") + + # 1. get format either from arg or URL + if not format: + _, suffix = os.path.splitext(url) + format = celtypes.StringType(suffix[1:]) + if format not in supported_formats: + raise ValueError(f"Unsupported format: {format!r}") + + # 2. read raw data + # Note this is directly bound to text_from() and does not go though the environment + # or other CEL indirection. + raw_data = cast(celtypes.StringType, text_from(url)) + + # 3. parse physical format (json, ldjson, ndjson, jsonl, txt, csv, csv2dict) + return parse_text(raw_data, format) + + +def jmes_path( + source_data: celtypes.Value, path_source: celtypes.StringType +) -> celtypes.Value: + """ + Apply JMESPath to an object read from from a URL. + """ + expression = jmespath.compile(path_source) + return json_to_cel(expression.search(source_data)) + + +def jmes_path_map( + source_data: celtypes.ListType, path_source: celtypes.StringType +) -> celtypes.ListType: + """ + Apply JMESPath to a each object read from from a URL. + This is for ndjson, nljson and jsonl files. + """ + expression = jmespath.compile(path_source) + return celtypes.ListType( + [json_to_cel(expression.search(row)) for row in source_data] + ) + + +def marked_key( + source: celtypes.ListType, target: celtypes.StringType +) -> celtypes.Value: + """ + Examines a list of {"Key": text, "Value": text} mappings + looking for the given Key value. + + Parses a ``message:action@action_date`` value into a mapping + {"message": message, "action": action, "action_date": action_date} + + If no Key or no Value or the Value isn't the right structure, + the result is a null. + """ + value = key(source, target) + if value is None: + return None + try: + msg, tgt = cast(celtypes.StringType, value).rsplit(":", 1) + action, action_date_str = tgt.strip().split("@", 1) + except ValueError: + return None + return celtypes.MapType( + { + celtypes.StringType("message"): celtypes.StringType(msg), + celtypes.StringType("action"): celtypes.StringType(action), + celtypes.StringType("action_date"): celtypes.TimestampType(action_date_str), + } + ) + + +def image(resource: celtypes.MapType) -> celtypes.Value: + """ + Reach into C7N to get the image details for this EC2 or ASG resource. + + Minimally, the creation date is transformed into a CEL timestamp. + We may want to slightly generalize this to json_to_cell() the entire Image object. + + The following may be usable, but it seems too complex: + + :: + + C7N.filter.prefetch_instance_images(C7N.policy.resources) + image = C7N.filter.get_instance_image(resource["ImageId"]) + return json_to_cel(image) + + .. todo:: Refactor C7N + + Provide the :py:class:`InstanceImageBase` mixin in a :py:class:`CELFilter` class. + We want to have the image details in the new :py:class:`CELFilter` instance. + """ + + # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter. + # Requies the policy already did this: C7N.filter.prefetch_instance_images([resource]) to + # populate cache. + image = C7N.filter.get_instance_image(resource) + + if image: + creation_date = image["CreationDate"] + image_name = image["Name"] + else: + creation_date = "2000-01-01T01:01:01.000Z" + image_name = "" + + return json_to_cel( + {"CreationDate": dateutil.parser.isoparse(creation_date), "Name": image_name} + ) + + +def get_raw_metrics(request: celtypes.MapType) -> celtypes.Value: + """ + Reach into C7N and make a statistics request using the current C7N filter object. + + The ``request`` parameter is the request object that is passed through to AWS via + the current C7N filter's manager. The request is a Mapping with the following keys and values: + + :: + + get_raw_metrics({ + "Namespace": "AWS/EC2", + "MetricName": "CPUUtilization", + "Dimensions": {"Name": "InstanceId", "Value": resource.InstanceId}, + "Statistics": ["Average"], + "StartTime": now - duration("4d"), + "EndTime": now, + "Period": duration("86400s") + }) + + The request is passed through to AWS more-or-less directly. The result is a CEL + list of values for then requested statistic. A ``.map()`` macro + can be used to compute additional details. An ``.exists()`` macro can filter the + data to look for actionable values. + + We would prefer to refactor C7N and implement this with code something like this: + + :: + + C7N.filter.prepare_query(C7N.policy.resources) + data = C7N.filter.get_resource_statistics(client, resource) + return json_to_cel(data) + + .. todo:: Refactor C7N + + Provide a :py:class:`MetricsAccess` mixin in a :py:class:`CELFilter` class. + We want to have the metrics processing in the new :py:class:`CELFilter` instance. + + """ + client = C7N.filter.manager.session_factory().client("cloudwatch") + data = client.get_metric_statistics( + Namespace=request["Namespace"], + MetricName=request["MetricName"], + Statistics=request["Statistics"], + StartTime=request["StartTime"], + EndTime=request["EndTime"], + Period=request["Period"], + Dimensions=request["Dimensions"], + )["Datapoints"] + + return json_to_cel(data) + + +def get_metrics( + resource: celtypes.MapType, request: celtypes.MapType +) -> celtypes.Value: + """ + Reach into C7N and make a statistics request using the current C7N filter. + + This builds a request object that is passed through to AWS via the :func:`get_raw_metrics` + function. + + The ``request`` parameter is a Mapping with the following keys and values: + + :: + + resource.get_metrics({"MetricName": "CPUUtilization", "Statistic": "Average", + "StartTime": now - duration("4d"), "EndTime": now, "Period": duration("86400s")} + ).exists(m, m < 30) + + The namespace is derived from the ``C7N.policy``. The dimensions are derived from + the ``C7N.fiter.model``. + + .. todo:: Refactor C7N + + Provide a :py:class:`MetricsAccess` mixin in a :py:class:`CELFilter` class. + We want to have the metrics processing in the new :py:class:`CELFilter` instance. + + """ + dimension = C7N.filter.manager.get_model().dimension + namespace = C7N.filter.manager.resource_type + # TODO: Varies by resource/policy type. Each policy's model may have different dimensions. + dimensions = [{"Name": dimension, "Value": resource.get(dimension)}] + raw_metrics = get_raw_metrics(cast(celtypes.MapType, json_to_cel( + { + "Namespace": namespace, + "MetricName": request["MetricName"], + "Dimensions": dimensions, + "Statistics": [request["Statistic"]], + "StartTime": request["StartTime"], + "EndTime": request["EndTime"], + "Period": request["Period"], + } + ))) + return json_to_cel( + [ + cast(Dict[str, celtypes.Value], item).get(request["Statistic"]) + for item in cast(List[celtypes.Value], raw_metrics) + ] + ) + + +def get_raw_health_events(request: celtypes.MapType) -> celtypes.Value: + """ + Reach into C7N and make a health-events request using the current C7N filter. + + The ``request`` parameter is the filter object that is passed through to AWS via + the current C7N filter's manager. The request is a List of AWS health events. + + :: + + get_raw_health_events({ + "services": ["ELASTICFILESYSTEM"], + "regions": ["us-east-1", "global"], + "eventStatusCodes": ['open', 'upcoming'], + }) + """ + client = C7N.filter.manager.session_factory().client( + 'health', region_name='us-east-1') + data = client.describe_events(filter=request)['events'] + return json_to_cel(data) + + +def get_health_events( + resource: celtypes.MapType, + statuses: Optional[List[celtypes.Value]] = None +) -> celtypes.Value: + """ + Reach into C7N and make a health-event request using the current C7N filter. + + This builds a request object that is passed through to AWS via the :func:`get_raw_health_events` + function. + + .. todo:: Handle optional list of event types. + """ + if not statuses: + statuses = [celtypes.StringType('open'), celtypes.StringType('upcoming')] + phd_svc_name_map = { + 'app-elb': 'ELASTICLOADBALANCING', + 'ebs': 'EBS', + 'efs': 'ELASTICFILESYSTEM', + 'elb': 'ELASTICLOADBALANCING', + 'emr': 'ELASTICMAPREDUCE' + } + m = C7N.filter.manager + service = phd_svc_name_map.get(m.data['resource'], m.get_model().service.upper()) + raw_events = get_raw_health_events(cast(celtypes.MapType, json_to_cel( + { + "services": [service], + "regions": [m.config.region, 'global'], + "eventStatusCodes": statuses, + } + ))) + return raw_events + + +def get_related_ids(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_ids() request using the current C7N filter. + + .. todo:: Refactor C7N + + Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class. + We want to have the related id's details in the new :py:class:`CELFilter` instance. + """ + + # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter. + related_ids = C7N.filter.get_related_ids(resource) + return json_to_cel(related_ids) + + +def get_related_sgs(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_sgs() request using the current C7N filter. + """ + security_groups = C7N.filter.get_related_sgs(resource) + return json_to_cel(security_groups) + + +def get_related_subnets(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_subnets() request using the current C7N filter. + """ + subnets = C7N.filter.get_related_subnets(resource) + return json_to_cel(subnets) + + +def get_related_nat_gateways(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_nat_gateways() request using the current C7N filter. + """ + nat_gateways = C7N.filter.get_related_nat_gateways(resource) + return json_to_cel(nat_gateways) + + +def get_related_igws(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_igws() request using the current C7N filter. + """ + igws = C7N.filter.get_related_igws(resource) + return json_to_cel(igws) + + +def get_related_security_configs(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_security_configs() request using the current C7N filter. + """ + security_configs = C7N.filter.get_related_security_configs(resource) + return json_to_cel(security_configs) + + +def get_related_vpc(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_vpc() request using the current C7N filter. + """ + vpc = C7N.filter.get_related_vpc(resource) + return json_to_cel(vpc) + + +def get_related_kms_keys(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related_kms_keys() request using the current C7N filter. + """ + vpc = C7N.filter.get_related_kms_keys(resource) + return json_to_cel(vpc) + + +def security_group(security_group_id: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and make a get_related() request using the current C7N filter to get + the security group. + + .. todo:: Refactor C7N + + Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class. + We want to have the related id's details in the new :py:class:`CELFilter` instance. + See :py:class:`VpcSecurityGroupFilter` subclass of :py:class:`RelatedResourceFilter`. + """ + + # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter. + security_groups = C7N.filter.get_related([security_group_id]) + return json_to_cel(security_groups) + + +def subnet(subnet_id: celtypes.Value,) -> celtypes.Value: + """ + Reach into C7N and make a get_related() request using the current C7N filter to get + the subnet. + + .. todo:: Refactor C7N + + Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class. + We want to have the related id's details in the new :py:class:`CELFilter` instance. + See :py:class:`VpcSubnetFilter` subclass of :py:class:`RelatedResourceFilter`. + """ + # Get related ID's first, then get items for the related ID's. + subnets = C7N.filter.get_related([subnet_id]) + return json_to_cel(subnets) + + +def flow_logs(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N and locate the flow logs using the current C7N filter. + + .. todo:: Refactor C7N + + Provide a separate function to get the flow logs, separate from the + the filter processing. + + .. todo:: Refactor :func:`c7nlib.flow_logs` -- it exposes too much implementation detail. + + """ + # TODO: Refactor into a function in ``CELFilter``. Should not be here. + client = C7N.filter.manager.session_factory().client("ec2") + logs = client.describe_flow_logs().get("FlowLogs", ()) + m = C7N.filter.manager.get_model() + resource_map: Dict[str, List[Dict[str, Any]]] = {} + for fl in logs: + resource_map.setdefault(fl["ResourceId"], []).append(fl) + if resource.get(m.id) in resource_map: + flogs = resource_map[cast(str, resource.get(m.id))] + return json_to_cel(flogs) + return json_to_cel([]) + + +def vpc(vpc_id: celtypes.Value,) -> celtypes.Value: + """ + Reach into C7N and make a ``get_related()`` request using the current C7N filter to get + the VPC details. + + .. todo:: Refactor C7N + + Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class. + We want to have the related id's details in the new :py:class:`CELFilter` instance. + See :py:class:`VpcFilter` subclass of :py:class:`RelatedResourceFilter`. + """ + # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter. + vpc = C7N.filter.get_related([vpc_id]) + return json_to_cel(vpc) + + +def subst(jmes_path: celtypes.StringType,) -> celtypes.StringType: + """ + Reach into C7N and build a set of substitutions to replace text in a JMES path. + + This is based on how :py:class:`c7n.resolver.ValuesFrom` works. There are + two possible substitution values: + + - account_id + - region + + :param jmes_path: the source + :return: A JMES with values replaced. + """ + + config_args = { + "account_id": C7N.filter.manager.config.account_id, + "region": C7N.filter.manager.config.region, + } + return celtypes.StringType(jmes_path.format(**config_args)) + + +def credentials(resource: celtypes.MapType) -> celtypes.Value: + """ + Reach into C7N and make a get_related() request using the current C7N filter to get + the IAM-role credential details. + + See :py:class:`c7n.resources.iam.CredentialReport` filter. + The `get_credential_report()` function does what we need. + + .. todo:: Refactor C7N + + Refactor the :py:class:`c7n.resources.iam.CredentialReport` filter into a + ``CredentialReportMixin`` mixin to the :py:class:`CELFilter` class. + The ``get_credential_report()`` function does what we need. + """ + return json_to_cel(C7N.filter.get_credential_report(resource)) + + +def kms_alias(vpc_id: celtypes.Value,) -> celtypes.Value: + """ + Reach into C7N and make a get_matching_aliases() request using the current C7N filter to get + the alias. + + See :py:class:`c7n.resources.kms.ResourceKmsKeyAlias`. + The `get_matching_aliases()` function does what we need. + + .. todo:: Refactor C7N + + Refactor the :py:class:`c7n.resources.kms.ResourceKmsKeyAlias` filter into a + ``ResourceKmsKeyAliasMixin`` mixin to the :py:class:`CELFilter` class. + The ``get_matching_aliases()`` dfunction does what we need. + """ + return json_to_cel(C7N.filter.get_matching_aliases()) + + +def kms_key(key_id: celtypes.Value,) -> celtypes.Value: + """ + Reach into C7N and make a ``get_related()`` request using the current C7N filter to get + the key. We're looking for the c7n.resources.kms.Key resource manager to get the related key. + + .. todo:: Refactor C7N + + Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class. + """ + key = C7N.filter.get_related([key_id]) + return json_to_cel(key) + + +def resource_schedule( + tag_value: celtypes.Value, +) -> celtypes.Value: + """ + Reach into C7N and use the the :py:class:`c7n.filters.offhours.ScheduleParser` class + to examine the tag's value, the current time, and return a True/False. + This parser is the `parser` value of the :py:class:`c7n.filters.offhours.Time` filter class. + Using the filter's `parser.parse(value)` provides needed structure. + + The `filter.parser.parse(value)` will transform text of the Tag value + into a dictionary. This is further transformed to something we can use in CEL. + + From this + :: + + off=[(M-F,21),(U,18)];on=[(M-F,6),(U,10)];tz=pt + + C7N ScheduleParser produces this + :: + + { + off: [ + { days: [1, 2, 3, 4, 5], hour: 21 }, + { days: [0], hour: 18 } + ], + on: [ + { days: [1, 2, 3, 4, 5], hour: 6 }, + { days: [0], hour: 10 } + ], + tz: "pt" + } + + For CEL, we need this + :: + + { + off: [ + { days: [1, 2, 3, 4, 5], hour: 21, tz: "pt" }, + { days: [0], hour: 18, tz: "pt" } + ], + on: [ + { days: [1, 2, 3, 4, 5], hour: 6, tz: "pt" }, + { days: [0], hour: 10, tz: "pt" } + ], + } + + This lets a CEL expression use + :: + + key("maid_offhours").resource_schedule().off.exists(s, + now.getDayOfWeek(s.tz) in s.days && now.getHour(s.tz) == s.hour) + """ + c7n_sched_doc = C7N.filter.parser.parse(tag_value) + tz = c7n_sched_doc.pop("tz", "et") + cel_sched_doc = { + state: [ + {"days": time["days"], "hour": time["hour"], "tz": tz} for time in time_list + ] + for state, time_list in c7n_sched_doc.items() + } + return json_to_cel(cel_sched_doc) + + +def get_accounts(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N filter and get accounts for a given resource. + Used by resources like AMI's, log-groups, ebs-snapshot, etc. + + .. todo:: Refactor C7N + + Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + as a mixin to ``CELFilter``. + """ + return json_to_cel(C7N.filter.get_accounts()) + + +def get_vpcs(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N filter and get vpcs for a given resource. + Used by resources like AMI's, log-groups, ebs-snapshot, etc. + + .. todo:: Refactor C7N + + Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + as a mixin to ``CELFilter``. + """ + return json_to_cel(C7N.filter.get_vpcs()) + + +def get_vpces(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N filter and get vpces for a given resource. + Used by resources like AMI's, log-groups, ebs-snapshot, etc. + + .. todo:: Refactor C7N + + Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + as a mixin to ``CELFilter``. + + """ + return json_to_cel(C7N.filter.get_vpces()) + + +def get_orgids(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N filter and get orgids for a given resource. + Used by resources like AMI's, log-groups, ebs-snapshot, etc. + + .. todo:: Refactor C7N + + Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + as a mixin to ``CELFilter``. + """ + return json_to_cel(C7N.filter.get_orgids()) + + +def get_endpoints(resource: celtypes.MapType,) -> celtypes.Value: + """For sns resources + + .. todo:: Refactor C7N + + Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter` + as a mixin to ``CELFilter``. + """ + return json_to_cel(C7N.filter.get_endpoints()) + + +def get_protocols(resource: celtypes.MapType,) -> celtypes.Value: + """For sns resources + + .. todo:: Refactor C7N + """ + return json_to_cel(C7N.filter.get_protocols()) + + +def get_key_policy(resource: celtypes.MapType,) -> celtypes.Value: + """For kms resources + + .. todo:: Refactor C7N + """ + key_id = resource.get( + celtypes.StringType("TargetKeyId"), + resource.get(celtypes.StringType("KeyId"))) + client = C7N.filter.manager.session_factory().client("kms") + return json_to_cel( + client.get_key_policy( + KeyId=key_id, + PolicyName='default')['Policy'] + ) + + +def get_resource_policy(resource: celtypes.MapType,) -> celtypes.Value: + """ + Reach into C7N filter and get the resource policy for a given resource. + Used by resources like AMI's, log-groups, ebs-snapshot, etc. + + .. todo:: Refactor C7N + """ + return json_to_cel(C7N.filter.get_resource_policy()) + + +def describe_subscription_filters(resource: celtypes.MapType,) -> celtypes.Value: + """ + For log-groups resources. + + .. todo:: Refactor C7N + + this should be directly available in CELFilter. + """ + client = C7N.filter.manager.session_factory().client("logs") + return json_to_cel( + C7N.filter.manager.retry( + client.describe_subscription_filters, + logGroupName=resource['logGroupName'] + ).get('subscriptionFilters', ()) + ) + + +def describe_db_snapshot_attributes(resource: celtypes.MapType,) -> celtypes.Value: + """ + For rds-snapshot and ebs-snapshot resources + + .. todo:: Refactor C7N + + this should be directly available in CELFilter. + """ + client = C7N.filter.manager.session_factory().client("ec2") + return json_to_cel( + C7N.filter.manager.retry( + client.describe_snapshot_attribute, + SnapshotId=resource['SnapshotId'], + Attribute='createVolumePermission' + ) + ) + + +def arn_split(arn: celtypes.StringType, field: celtypes.StringType) -> celtypes.Value: + """ + Parse an ARN, removing a partivular field. + The field name must one one of + "partition", "service", "region", "account-id", "resource-type", "resource-id" + In the case of a ``resource-type/resource-id`` path, this will be a "resource-id" value, + and there will be no "resource-type". + + Examples formats + + ``arn:partition:service:region:account-id:resource-id`` + + ``arn:partition:service:region:account-id:resource-type/resource-id`` + + ``arn:partition:service:region:account-id:resource-type:resource-id`` + """ + field_names = { + len(names): names + for names in [ + ("partition", "service", "region", "account-id", "resource-id"), + ("partition", "service", "region", "account-id", "resource-type", "resource-id"), + ] + } + prefix, *fields = arn.split(":") + if prefix != "arn": + raise ValueError(f"Not an ARN: {arn}") + mapping = dict(zip(field_names[len(fields)], fields)) + return json_to_cel(mapping[field]) + + +def all_images() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter._pull_ec2_images` and :py:meth:`CELFilter._pull_asg_images` + + See :py:class:`c7n.resources.ami.ImageUnusedFilter` + """ + return json_to_cel( + list( + C7N.filter._pull_ec2_images() | C7N.filter._pull_asg_images() + ) + ) + + +def all_snapshots() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter._pull_asg_snapshots` + and :py:meth:`CELFilter._pull_ami_snapshots` + + See :py:class:`c7n.resources.ebs.SnapshotUnusedFilter` + """ + return json_to_cel( + list( + C7N.filter._pull_asg_snapshots() | C7N.filter._pull_ami_snapshots() + ) + ) + + +def all_launch_configuration_names() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.manager.get_launch_configuration_names` + + See :py:class:`c7n.resources.asg.UnusedLaunchConfig` + """ + asgs = C7N.filter.manager.get_resource_manager('asg').resources() + used = set([ + a.get('LaunchConfigurationName', a['AutoScalingGroupName']) + for a in asgs if not a.get('LaunchTemplate')]) + return json_to_cel(list(used)) + + +def all_service_roles() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.service_role_usage` + + See :py:class:`c7n.resources.iam.UnusedIamRole` + """ + return json_to_cel(C7N.filter.service_role_usage()) + + +def all_instance_profiles() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.instance_profile_usage` + + See :py:class:`c7n.resources.iam.UnusedInstanceProfiles` + """ + return json_to_cel(C7N.filter.instance_profile_usage()) + + +def all_dbsubenet_groups() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.get_dbsubnet_group_used` + + See :py:class:`c7n.resources.rds.UnusedRDSSubnetGroup` + """ + rds = C7N.filter.manager.get_resource_manager('rds').resources() + used = set([ + r.get('DBSubnetGroupName', r['DBInstanceIdentifier']) + for r in rds]) + return json_to_cel(list(used)) + + +def all_scan_groups() -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.scan_groups` + + See :py:class:`c7n.resources.vpc.UnusedSecurityGroup` + """ + return json_to_cel(C7N.filter.scan_groups()) + + +def get_access_log(resource: celtypes.MapType) -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.resources` + + See :py:class:`c7n.resources.elb.IsNotLoggingFilter` and + :py:class:`c7n.resources.elb.IsLoggingFilter`. + """ + client = C7N.filter.manager.session_factory().client('elb') + results = client.describe_load_balancer_attributes( + LoadBalancerName=resource['LoadBalancerName']) + return json_to_cel(results['LoadBalancerAttributes']) + + +def get_load_balancer(resource: celtypes.MapType) -> celtypes.Value: + """ + Depends on :py:meth:`CELFilter.resources` + + See :py:class:`c7n.resources.appelb.IsNotLoggingFilter` and + :py:class:`c7n.resources.appelb.IsLoggingFilter`. + """ + def parse_attribute_value(v: str) -> Union[int, bool, str]: + """Lightweight JSON atomic value convertion to native Python.""" + if v.isdigit(): + return int(v) + elif v == 'true': + return True + elif v == 'false': + return False + return v + + client = C7N.filter.manager.session_factory().client('elbv2') + results = client.describe_load_balancer_attributes( + LoadBalancerArn=resource['LoadBalancerArn']) + print(results) + return json_to_cel( + dict( + (item["Key"], parse_attribute_value(item["Value"])) + for item in results['Attributes'] + ) + ) + + +def shield_protection(resource: celtypes.MapType) -> celtypes.Value: + """ + Depends on the :py:meth:`c7n.resources.shield.IsShieldProtected.process` method. + This needs to be refactored and renamed to avoid collisions with other ``process()`` variants. + + Applies to most resource types. + """ + client = C7N.filter.manager.session_factory().client('shield', region_name='us-east-1') + protections = C7N.filter.get_type_protections(client, C7N.filter.manager.get_model()) + protected_resources = [p['ResourceArn'] for p in protections] + return json_to_cel(protected_resources) + + +def shield_subscription(resource: celtypes.MapType) -> celtypes.Value: + """ + Depends on :py:meth:`c7n.resources.account.ShieldEnabled.process` method. + This needs to be refactored and renamed to avoid collisions with other ``process()`` variants. + + Applies to account resources only. + """ + subscriptions = C7N.filter.account_shield_subscriptions(resource) + return json_to_cel(subscriptions) + + +def web_acls(resource: celtypes.MapType) -> celtypes.Value: + """ + Depends on :py:meth:`c7n.resources.cloudfront.IsWafEnabled.process` method. + This needs to be refactored and renamed to avoid collisions with other ``process()`` variants. + """ + wafs = C7N.filter.manager.get_resource_manager('waf').resources() + waf_name_id_map = {w['Name']: w['WebACLId'] for w in wafs} + return json_to_cel(waf_name_id_map) + + +DECLARATIONS: Dict[str, Annotation] = { + "glob": celtypes.FunctionType, + "difference": celtypes.FunctionType, + "intersect": celtypes.FunctionType, + "normalize": celtypes.FunctionType, + "parse_cidr": celtypes.FunctionType, # Callable[..., CIDR], + "size_parse_cidr": celtypes.FunctionType, + "unique_size": celtypes.FunctionType, + "version": celtypes.FunctionType, # Callable[..., ComparableVersion], + "present": celtypes.FunctionType, + "absent": celtypes.FunctionType, + "text_from": celtypes.FunctionType, + "value_from": celtypes.FunctionType, + "jmes_path": celtypes.FunctionType, + "jmes_path_map": celtypes.FunctionType, + "key": celtypes.FunctionType, + "marked_key": celtypes.FunctionType, + "image": celtypes.FunctionType, + "get_metrics": celtypes.FunctionType, + "get_related_ids": celtypes.FunctionType, + "security_group": celtypes.FunctionType, + "subnet": celtypes.FunctionType, + "flow_logs": celtypes.FunctionType, + "vpc": celtypes.FunctionType, + "subst": celtypes.FunctionType, + "credentials": celtypes.FunctionType, + "kms_alias": celtypes.FunctionType, + "kms_key": celtypes.FunctionType, + "resource_schedule": celtypes.FunctionType, + "get_accounts": celtypes.FunctionType, + "get_related_sgs": celtypes.FunctionType, + "get_related_subnets": celtypes.FunctionType, + "get_related_nat_gateways": celtypes.FunctionType, + "get_related_igws": celtypes.FunctionType, + "get_related_security_configs": celtypes.FunctionType, + "get_related_vpc": celtypes.FunctionType, + "get_related_kms_keys": celtypes.FunctionType, + "get_vpcs": celtypes.FunctionType, + "get_vpces": celtypes.FunctionType, + "get_orgids": celtypes.FunctionType, + "get_endpoints": celtypes.FunctionType, + "get_protocols": celtypes.FunctionType, + "get_key_policy": celtypes.FunctionType, + "get_resource_policy": celtypes.FunctionType, + "describe_subscription_filters": celtypes.FunctionType, + "describe_db_snapshot_attributes": celtypes.FunctionType, + "arn_split": celtypes.FunctionType, + "all_images": celtypes.FunctionType, + "all_snapshots": celtypes.FunctionType, + "all_launch_configuration_names": celtypes.FunctionType, + "all_service_roles": celtypes.FunctionType, + "all_instance_profiles": celtypes.FunctionType, + "all_dbsubenet_groups": celtypes.FunctionType, + "all_scan_groups": celtypes.FunctionType, + "get_access_log": celtypes.FunctionType, + "get_load_balancer": celtypes.FunctionType, + "shield_protection": celtypes.FunctionType, + "shield_subscription": celtypes.FunctionType, + "web_acls": celtypes.FunctionType, + # "etc.": celtypes.FunctionType, +} + +ExtFunction = Callable[..., celtypes.Value] + +FUNCTIONS: Dict[str, ExtFunction] = { + f.__name__: cast(ExtFunction, f) for f in [ + glob, + difference, + intersect, + normalize, + parse_cidr, + size_parse_cidr, + unique_size, + version, + present, + absent, + text_from, + value_from, + jmes_path, + jmes_path_map, + key, + marked_key, + image, + get_metrics, + get_related_ids, + security_group, + subnet, + flow_logs, + vpc, + subst, + credentials, + kms_alias, + kms_key, + resource_schedule, + get_accounts, + get_related_sgs, + get_related_subnets, + get_related_nat_gateways, + get_related_igws, + get_related_security_configs, + get_related_vpc, + get_related_kms_keys, + get_vpcs, + get_vpces, + get_orgids, + get_endpoints, + get_protocols, + get_key_policy, + get_resource_policy, + describe_subscription_filters, + describe_db_snapshot_attributes, + arn_split, + all_images, + all_snapshots, + all_launch_configuration_names, + all_service_roles, + all_instance_profiles, + all_dbsubenet_groups, + all_scan_groups, + get_access_log, + get_load_balancer, + shield_protection, + shield_subscription, + web_acls, + # etc. + ] +} + + +class C7N_Interpreted_Runner(InterpretedRunner): + """ + Extends the Evaluation to introduce the C7N CELFilter instance into the exvaluation. + + The variable is global to allow the functions to have the simple-looking argument + values that CEL expects. This allows a function in this module to reach outside CEL for + access to C7N's caches. + + .. todo: Refactor to be a mixin to the Runner class hierarchy. + """ + + def evaluate(self, context: Context, filter: Optional[Any] = None) -> celtypes.Value: + e = Evaluator( + ast=self.ast, + activation=self.new_activation(context), + functions=self.functions, + ) + with C7NContext(filter=filter): + value = e.evaluate() + return value diff --git a/.venv/lib/python3.12/site-packages/celpy/cel.lark b/.venv/lib/python3.12/site-packages/celpy/cel.lark new file mode 100644 index 00000000..3b06bf87 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/cel.lark @@ -0,0 +1,179 @@ +// SPDX-Copyright: Copyright (c) Capital One Services, LLC +// SPDX-License-Identifier: Apache-2.0 +// Copyright 2020 Capital One Services, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and limitations under the License. + + +// From https://github.com/google/cel-spec +// Modified in several ways: +// - EBNF to Lark: Rules are all lower case, No terminating ; on rules, Regexes wrapped in //, Replace ::= with :. +// - Strings expanded to include escapes and rewritten to be pure regex. +// - FLOAT_LIT expanded +// - Terminals reordered to reflect priorities better. + +// A number of rules used ! annotation to capture tokens. +// These were rewritten to expand into specific, named rules and avoid a need for the Lark "!" construct. + +expr : conditionalor ["?" conditionalor ":" expr] + +conditionalor : [conditionalor "||"] conditionaland + +conditionaland : [conditionaland "&&"] relation + +// Original... +// !relation : [relation relop] addition +// !relop : "<" | "<=" | ">=" | ">" | "==" | "!=" | "in" + +// Expose operators in the AST. +relation : [relation_lt | relation_le | relation_ge | relation_gt | relation_eq | relation_ne | relation_in] addition +relation_lt : relation "<" +relation_le : relation "<=" +relation_gt : relation ">" +relation_ge : relation ">=" +relation_eq : relation "==" +relation_ne : relation "!=" +relation_in : relation "in" + +// Original... +// !addition : [addition ("+" | "-")] multiplication + +// Expose operators in the AST. +addition : [addition_add | addition_sub] multiplication +addition_add : addition "+" +addition_sub : addition "-" + +// Original... +// !multiplication: [multiplication ("*" | "/" | "%")] unary + +// Expose operators in the AST. +multiplication : [multiplication_mul | multiplication_div | multiplication_mod] unary +multiplication_mul : multiplication "*" +multiplication_div : multiplication "/" +multiplication_mod : multiplication "%" + +// Original... +// !unary : member +// | "!" "!"* member +// | "-" "-"* member + +// Expose operators in the AST +// Option 1: zero or more token nodes; requires some care to handle sequence of operations. +//unary : [unary_not | unary_neg]* member + +// Option 2: separate expressions; doesn't maintain type safetly, allows ~!~!~!~!~x syntax which isn't really ideal. +unary : member + | unary_not unary + | unary_neg unary + +unary_not : "!" +unary_neg : "-" + + +// Original... +// !member : primary +// | member "." IDENT ["(" [exprlist] ")"] +// | member "[" expr "]" +// | member "{" [fieldinits] "}" + +// Expose constructs in the AST. +member : member_dot | member_dot_arg | member_index | member_object | primary +member_dot : member "." IDENT +member_dot_arg : member "." IDENT "(" [exprlist] ")" +member_index : member "[" expr "]" +member_object : member "{" [fieldinits] "}" + + +// Original... +// !primary : ["."] IDENT ["(" [exprlist] ")"] +// | "(" expr ")" +// | "[" [exprlist] "]" +// | "{" [mapinits] "}" +// | literal + +// Expose constructs in the AST. +primary : literal | dot_ident_arg | dot_ident | ident_arg + | paren_expr | list_lit | map_lit | ident +dot_ident_arg : "." IDENT "(" [exprlist] ")" +dot_ident : "." IDENT +ident_arg : IDENT "(" [exprlist] ")" +ident : IDENT +paren_expr : "(" expr ")" +list_lit : "[" [exprlist] "]" +map_lit : "{" [mapinits] "}" + +exprlist : expr ("," expr)* + +fieldinits : IDENT ":" expr ("," IDENT ":" expr)* + +mapinits : expr ":" expr ("," expr ":" expr)* + +// Elevated from Terminals to expose the type name in the AST. +literal : UINT_LIT | FLOAT_LIT | INT_LIT | MLSTRING_LIT | STRING_LIT | BYTES_LIT + | BOOL_LIT | NULL_LIT + +// Terminals. Order of some definitions altered to help lark. + +// Signs added (see https://github.com/google/cel-spec/issues/126) +INT_LIT : /-?/ /0x/ HEXDIGIT+ | /-?/ DIGIT+ + +UINT_LIT : INT_LIT /[uU]/ + +// Original... +// FLOAT_LIT : DIGIT* . DIGIT+ EXPONENT? | DIGIT+ EXPONENT + +// Expanded and signs added (see https://github.com/google/cel-spec/issues/126) +FLOAT_LIT : /-?/ DIGIT+ "." DIGIT* EXPONENT? | /-?/ DIGIT* "." DIGIT+ EXPONENT? | /-?/ DIGIT+ EXPONENT + +DIGIT : /[0-9]/ + +HEXDIGIT : /[0-9abcdefABCDEF]/ + +EXPONENT : /[eE]/ /[+-]?/ DIGIT+ + +// Rewritten into REGEX; explicitly list the escapes + +STRING_LIT : /[rR]?"(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4-8}|.)*?"/ + | /[rR]?'(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4}|\\U[0-9a-fA-F]{8}|.)*?'/ + +MLSTRING_LIT : /[rR]?"""(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4-8}|\r\n|\r|\n|.)*?"""/ + | /[rR]?'''(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4}|\\U[0-9a-fA-F]{8}|\r\n|\r|\n|.)*?'''/ + +BYTES_LIT : /[bB]/ MLSTRING_LIT | /[bB]/ STRING_LIT + +// Moved into STRING_LIT and MLSTRING_LIT, no longer needed. + +// ESCAPE : /\\[bfnrt"'\\]/ +// | /\\x/ HEXDIGIT HEXDIGIT +// | /\\u/ HEXDIGIT HEXDIGIT HEXDIGIT HEXDIGIT +// | /\\[0-3][0-7][0-7]/ + +// NEWLINE : /\\r\\n/ | /\\r/ | /\\n/ + +BOOL_LIT : "true" | "false" + +NULL_LIT : "null" + +IDENT : /[_a-zA-Z][_a-zA-Z0-9]*/ + +RESERVED.0 : "as" | "break" | "const" | "continue" | "else" + | "for" | "function" | "if" | "import" | "let" + | "loop" | "package" | "namespace" | "return" + | "var" | "void" | "while" + + +WHITESPACE : /[\t\n\f\r ]+/ + +COMMENT : /\/\/.*/ + +%ignore WHITESPACE +%ignore COMMENT diff --git a/.venv/lib/python3.12/site-packages/celpy/celparser.py b/.venv/lib/python3.12/site-packages/celpy/celparser.py new file mode 100644 index 00000000..020621e6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/celparser.py @@ -0,0 +1,402 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +CEL Parser. + +See https://github.com/google/cel-spec/blob/master/doc/langdef.md + +https://github.com/google/cel-cpp/blob/master/parser/Cel.g4 + +https://github.com/google/cel-go/blob/master/parser/gen/CEL.g4 + +Builds a parser from the supplied cel.lark grammar. + +.. todo:: Consider embedding the ``cel.lark`` file as a triple-quoted literal. + + This means fixing a LOT of \\'s. But it also eliminates a data file from the installation. + +Example:: + + >>> from celpy.celparser import CELParser + >>> p = CELParser() + >>> text2 = 'type(null)' + >>> ast2 = p.parse(text2) + >>> print(ast2.pretty().replace("\t"," ")) # doctest: +NORMALIZE_WHITESPACE + expr + conditionalor + conditionaland + relation + addition + multiplication + unary + member + primary + ident_arg + type + exprlist + expr + conditionalor + conditionaland + relation + addition + multiplication + unary + member + primary + literal null + + +""" +import re +from pathlib import Path +from typing import Any, List, Optional, cast + +import lark.visitors +from lark import Lark, Token, Tree # noqa: F401 +from lark.exceptions import (LexError, ParseError, UnexpectedCharacters, + UnexpectedToken) + + +class CELParseError(Exception): + def __init__( + self, + *args: Any, + line: Optional[int] = None, + column: Optional[int] = None) -> None: + super().__init__(*args) + self.line = line + self.column = column + + +class CELParser: + """Wrapper for the CEL parser and the syntax error messages.""" + CEL_PARSER: Optional[Lark] = None + + def __init__(self) -> None: + if CELParser.CEL_PARSER is None: + CEL_grammar = (Path(__file__).parent / "cel.lark").read_text() + CELParser.CEL_PARSER = Lark( + CEL_grammar, + parser="lalr", + start="expr", + debug=True, + g_regex_flags=re.M, + lexer_callbacks={'IDENT': self.ambiguous_literals}, + propagate_positions=True, + ) + + @staticmethod + def ambiguous_literals(t: Token) -> Token: + """Resolve a grammar ambiguity between identifiers and literals""" + if t.value == "true": + return Token("BOOL_LIT", t.value) + elif t.value == "false": + return Token("BOOL_LIT", t.value) + return t + + def parse(self, text: str) -> Tree: + if CELParser.CEL_PARSER is None: + raise TypeError("No grammar loaded") # pragma: no cover + self.text = text + try: + return CELParser.CEL_PARSER.parse(self.text) + except (UnexpectedToken, UnexpectedCharacters) as ex: + message = ex.get_context(text) + raise CELParseError(message, *ex.args, line=ex.line, column=ex.column) + except (LexError, ParseError) as ex: # pragma: no cover + message = ex.args[0].splitlines()[0] + raise CELParseError(message, *ex.args) + + def error_text( + self, + message: str, + line: Optional[int] = None, + column: Optional[int] = None) -> str: + source = self.text.splitlines()[line - 1] if line else self.text + message = ( + f"ERROR: <input>:{line or '?'}:{column or '?'} {message}\n" + f" | {source}\n" + f" | {(column - 1) * '.' if column else ''}^\n" + ) + return message + + +class DumpAST(lark.visitors.Visitor_Recursive): + """Dump a CEL AST creating a close approximation to the original source.""" + + @classmethod + def display(cls_, ast: lark.Tree) -> str: + d = cls_() + d.visit(ast) + return d.stack[0] + + def __init__(self) -> None: + self.stack: List[str] = [] + + def expr(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + cond = self.stack.pop() + self.stack.append( + f"{cond} ? {left} : {right}" + ) + + def conditionalor(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} || {right}" + ) + + def conditionaland(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} && {right}" + ) + + def relation(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} {right}" + ) + + def relation_lt(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} < ") + + def relation_le(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} <= ") + + def relation_gt(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} > ") + + def relation_ge(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} >= ") + + def relation_eq(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} == ") + + def relation_ne(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} != ") + + def relation_in(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} in ") + + def addition(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} {right}" + ) + + def addition_add(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} + ") + + def addition_sub(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} - ") + + def multiplication(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} {right}" + ) + + def multiplication_mul(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} * ") + + def multiplication_div(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} / ") + + def multiplication_mod(self, tree: lark.Tree) -> None: + left = self.stack.pop() + self.stack.append(f"{left} % ") + + def unary(self, tree: lark.Tree) -> None: + if len(tree.children) == 1: + return + else: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append( + f"{left} {right}" + ) + + def unary_not(self, tree: lark.Tree) -> None: + self.stack.append("!") + + def unary_neg(self, tree: lark.Tree) -> None: + self.stack.append("-") + + def member_dot(self, tree: lark.Tree) -> None: + right = cast(lark.Token, tree.children[1]).value + if self.stack: + left = self.stack.pop() + self.stack.append(f"{left}.{right}") + + def member_dot_arg(self, tree: lark.Tree) -> None: + if len(tree.children) == 3: + exprlist = self.stack.pop() + else: + exprlist = "" + right = cast(lark.Token, tree.children[1]).value + if self.stack: + left = self.stack.pop() + self.stack.append(f"{left}.{right}({exprlist})") + else: + self.stack.append(f".{right}({exprlist})") + + def member_index(self, tree: lark.Tree) -> None: + right = self.stack.pop() + left = self.stack.pop() + self.stack.append(f"{left}[{right}]") + + def member_object(self, tree: lark.Tree) -> None: + if len(tree.children) == 2: + fieldinits = self.stack.pop() + else: + fieldinits = "" + left = self.stack.pop() + self.stack.append(f"{left}{{{fieldinits}}}") + + def dot_ident_arg(self, tree: lark.Tree) -> None: + if len(tree.children) == 2: + exprlist = self.stack.pop() + else: + exprlist = "" + left = cast(lark.Token, tree.children[0]).value + self.stack.append(f".{left}({exprlist})") + + def dot_ident(self, tree: lark.Tree) -> None: + left = cast(lark.Token, tree.children[0]).value + self.stack.append(f".{left}") + + def ident_arg(self, tree: lark.Tree) -> None: + if len(tree.children) == 2: + exprlist = self.stack.pop() + else: + exprlist = "" + + left = cast(lark.Token, tree.children[0]).value + self.stack.append(f"{left}({exprlist})") + + def ident(self, tree: lark.Tree) -> None: + self.stack.append(cast(lark.Token, tree.children[0]).value) + + def paren_expr(self, tree: lark.Tree) -> None: + if self.stack: + left = self.stack.pop() + self.stack.append(f"({left})") + + def list_lit(self, tree: lark.Tree) -> None: + if self.stack: + left = self.stack.pop() + self.stack.append(f"[{left}]") + + def map_lit(self, tree: lark.Tree) -> None: + if self.stack: + left = self.stack.pop() + self.stack.append(f"{{{left}}}") + else: + self.stack.append("{}") + + def exprlist(self, tree: lark.Tree) -> None: + items = ", ".join(reversed(list(self.stack.pop() for _ in tree.children))) + self.stack.append(items) + + def fieldinits(self, tree: lark.Tree) -> None: + names = cast(List[lark.Token], tree.children[::2]) + values = cast(List[lark.Token], tree.children[1::2]) + assert len(names) == len(values) + pairs = reversed(list((n.value, self.stack.pop()) for n, v in zip(names, values))) + items = ", ".join(f"{n}: {v}" for n, v in pairs) + self.stack.append(items) + + def mapinits(self, tree: lark.Tree) -> None: + """Note reversed pop order for values and keys.""" + keys = tree.children[::2] + values = tree.children[1::2] + assert len(keys) == len(values) + pairs = reversed(list( + {'value': self.stack.pop(), 'key': self.stack.pop()} + for k, v in zip(keys, values) + )) + items = ", ".join(f"{k_v['key']}: {k_v['value']}" for k_v in pairs) + self.stack.append(items) + + def literal(self, tree: lark.Tree) -> None: + if tree.children: + self.stack.append(cast(lark.Token, tree.children[0]).value) + + +def tree_dump(ast: Tree) -> str: + """Dumps the AST to approximate the original source""" + d = DumpAST() + d.visit(ast) + return d.stack[0] + + +if __name__ == "__main__": # pragma: no cover + # A minimal sanity check. + # This is a smoke test for the grammar to expose shift/reduce or reduce/reduce conflicts. + # It will produce a RuntimeWarning because it's not the proper main program. + p = CELParser() + + text = """ + account.balance >= transaction.withdrawal + || (account.overdraftProtection + && account.overdraftLimit >= transaction.withdrawal - account.balance) + """ + ast = p.parse(text) + print(ast) + + d = DumpAST() + d.visit(ast) + print(d.stack) + + text2 = """type(null)""" + ast2 = p.parse(text2) + print(ast2.pretty()) diff --git a/.venv/lib/python3.12/site-packages/celpy/celtypes.py b/.venv/lib/python3.12/site-packages/celpy/celtypes.py new file mode 100644 index 00000000..3c4513ac --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/celtypes.py @@ -0,0 +1,1495 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +CEL Types: wrappers on Python types to provide CEL semantics. + +This can be used by a Python module to work with CEL-friendly values and CEL results. + +Examples of distinctions between CEL and Python: + +- Unlike Python ``bool``, CEL :py:class:`BoolType` won't do some math. + +- CEL has ``int64`` and ``uint64`` subclasses of integer. These have specific ranges and + raise :exc:`ValueError` errors on overflow. + +CEL types will raise :exc:`ValueError` for out-of-range values and :exc:`TypeError` +for operations they refuse. +The :py:mod:`evaluation` module can capture these exceptions and turn them into result values. +This can permit the logic operators to quietly silence them via "short-circuiting". + +In the normal course of events, CEL's evaluator may attempt operations between a +CEL exception result and an instance of one of CEL types. +We rely on this leading to an ordinary Python :exc:`TypeError` to be raised to propogate +the error. Or. A logic operator may discard the error object. + +The :py:mod:`evaluation` module extends these types with it's own :exc:`CELEvalError` exception. +We try to keep that as a separate concern from the core operator implementations here. +We leverage Python features, which means raising exceptions when there is a problem. + +Types +============= + +See https://github.com/google/cel-go/tree/master/common/types + +These are the Go type definitions that are used by CEL: + +- BoolType +- BytesType +- DoubleType +- DurationType +- IntType +- ListType +- MapType +- NullType +- StringType +- TimestampType +- TypeType +- UintType + +The above types are handled directly byt CEL syntax. +e.g., ``42`` vs. ``42u`` vs. ``"42"`` vs. ``b"42"`` vs. ``42.``. + +We provide matching Python class names for each of these types. The Python type names +are subclasses of Python native types, allowing a client to transparently work with +CEL results. A Python host should be able to provide values to CEL that will be tolerated. + +A type hint of ``Value`` unifies these into a common hint. + +The CEL Go implementation also supports protobuf types: + +- dpb.Duration +- tpb.Timestamp +- structpb.ListValue +- structpb.NullValue +- structpb.Struct +- structpb.Value +- wrapperspb.BoolValue +- wrapperspb.BytesValue +- wrapperspb.DoubleValue +- wrapperspb.FloatValue +- wrapperspb.Int32Value +- wrapperspb.Int64Value +- wrapperspb.StringValue +- wrapperspb.UInt32Value +- wrapperspb.UInt64Value + +These types involve expressions like the following:: + + google.protobuf.UInt32Value{value: 123u} + +In this case, the well-known protobuf name is directly visible as CEL syntax. +There's a ``google`` package with the needed definitions. + +Type Provider +============================== + +A type provider can be bound to the environment, this will support additional types. +This appears to be a factory to map names of types to type classes. + +Run-time type binding is shown by a CEL expression like the following:: + + TestAllTypes{single_uint32_wrapper: 432u} + +The ``TestAllTypes`` is a protobuf type added to the CEL run-time. The syntax +is defined by this syntax rule:: + + member_object : member "{" [fieldinits] "}" + +The ``member`` is part of a type provider library, +either a standard protobuf definition or an extension. The field inits build +values for the protobuf object. + +See https://github.com/google/cel-go/blob/master/test/proto3pb/test_all_types.proto +for the ``TestAllTypes`` protobuf definition that is registered as a type provider. + +This expression will describes a Protobuf ``uint32`` object. + +Type Adapter +============= + +So far, it appears that a type adapter wraps existing Go or C++ types +with CEL-required methods. This seems like it does not need to be implemented +in Python. + +Numeric Details +=============== + +Integer division truncates toward zero. + +The Go definition of modulus:: + + // Mod returns the floating-point remainder of x/y. + // The magnitude of the result is less than y and its + // sign agrees with that of x. + +https://golang.org/ref/spec#Arithmetic_operators + +"Go has the nice property that -a/b == -(a/b)." + +:: + + x y x / y x % y + 5 3 1 2 + -5 3 -1 -2 + 5 -3 -1 2 + -5 -3 1 -2 + +Python definition:: + + The modulo operator always yields a result + with the same sign as its second operand (or zero); + the absolute value of the result is strictly smaller than + the absolute value of the second operand. + +Here's the essential rule:: + + x//y * y + x%y == x + +However. Python ``//`` truncates toward negative infinity. Go ``/`` truncates toward zero. + +To get Go-like behavior, we need to use absolute values and restore the signs later. + +:: + + x_sign = -1 if x < 0 else +1 + go_mod = x_sign * (abs(x) % abs(y)) + return go_mod + +Timzone Details +=============== + +An implementation may have additional timezone names that must be injected into +th dateutil.gettz() processing. + +For example, there may be the following sequence: + +1. A lowercase match for an alias or an existing dateutil timezone. + +2. A titlecase match for an existing dateutil timezone. + +3. The fallback, which is a +/-HH:MM string. + +.. TODO: Permit an extension into the timezone lookup. + +""" +import datetime +import logging +import re +from functools import reduce, wraps +from math import fsum +from typing import (Any, Callable, Dict, Iterable, List, Mapping, NoReturn, + Optional, Sequence, Tuple, Type, TypeVar, Union, cast, + overload) + +import dateutil.parser +import dateutil.tz + +logger = logging.getLogger("celtypes") + + +Value = Union[ + 'BoolType', + 'BytesType', + 'DoubleType', + 'DurationType', + 'IntType', + 'ListType', + 'MapType', + None, # Used instead of NullType + 'StringType', + 'TimestampType', + 'UintType', +] + +# The domain of types used to build Annotations. +CELType = Union[ + Type['BoolType'], + Type['BytesType'], + Type['DoubleType'], + Type['DurationType'], + Type['IntType'], + Type['ListType'], + Type['MapType'], + Callable[..., None], # Used instead of NullType + Type['StringType'], + Type['TimestampType'], + Type['TypeType'], # Used to mark Protobuf Type values + Type['UintType'], + Type['PackageType'], + Type['MessageType'], +] + + +def type_matched(method: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]: + """Decorates a method to assure the "other" value has the same type.""" + @wraps(method) + def type_matching_method(self: Any, other: Any) -> Any: + if not(issubclass(type(other), type(self)) or issubclass(type(self), type(other))): + raise TypeError(f"no such overload: {self!r} {type(self)} != {other!r} {type(other)}") + return method(self, other) + return type_matching_method + + +def logical_condition(e: Value, x: Value, y: Value) -> Value: + """ + CEL e ? x : y operator. + Choose one of x or y. Exceptions in the unchosen expression are ignored. + + Example:: + + 2 / 0 > 4 ? 'baz' : 'quux' + + is a "division by zero" error. + + :: + + >>> logical_condition( + ... BoolType(True), StringType("this"), StringType("Not That")) + StringType('this') + >>> logical_condition( + ... BoolType(False), StringType("Not This"), StringType("that")) + StringType('that') + """ + if not isinstance(e, BoolType): + raise TypeError(f"Unexpected {type(e)} ? {type(x)} : {type(y)}") + result = x if e else y + logger.debug("logical_condition(%r, %r, %r) = %r", e, x, y, result) + return result + + +def logical_and(x: Value, y: Value) -> Value: + """ + Native Python has a left-to-right rule. + CEL && is commutative with non-Boolean values, including error objects. + """ + if not isinstance(x, BoolType) and not isinstance(y, BoolType): + raise TypeError(f"{type(x)} {x!r} and {type(y)} {y!r}") + elif not isinstance(x, BoolType) and isinstance(y, BoolType): + if y: + return x # whatever && true == whatever + else: + return y # whatever && false == false + elif isinstance(x, BoolType) and not isinstance(y, BoolType): + if x: + return y # true && whatever == whatever + else: + return x # false && whatever == false + else: + return BoolType(cast(BoolType, x) and cast(BoolType, y)) + + +def logical_not(x: Value) -> Value: + """ + Native python `not` isn't fully exposed for CEL types. + """ + if isinstance(x, BoolType): + result = BoolType(not x) + else: + raise TypeError(f"not {type(x)}") + logger.debug("logical_not(%r) = %r", x, result) + return result + + +def logical_or(x: Value, y: Value) -> Value: + """ + Native Python has a left-to-right rule: (True or y) is True, (False or y) is y. + CEL || is commutative with non-Boolean values, including errors. + ``(x || false)`` is ``x``, and ``(false || y)`` is ``y``. + + Example 1:: + + false || 1/0 != 0 + + is a "no matching overload" error. + + Example 2:: + + (2 / 0 > 3 ? false : true) || true + + is a "True" + + If the operand(s) are not BoolType, we'll create an TypeError that will become a CELEvalError. + """ + if not isinstance(x, BoolType) and not isinstance(y, BoolType): + raise TypeError(f"{type(x)} {x!r} or {type(y)} {y!r}") + elif not isinstance(x, BoolType) and isinstance(y, BoolType): + if y: + return y # whatever || true == true + else: + return x # whatever || false == whatever + elif isinstance(x, BoolType) and not isinstance(y, BoolType): + if x: + return x # true || whatever == true + else: + return y # false || whatever == whatever + else: + return BoolType(cast(BoolType, x) or cast(BoolType, y)) + + +class BoolType(int): + """ + Native Python permits unary operators on Booleans. + + For CEL, We need to prevent -false from working. + """ + def __new__(cls: Type['BoolType'], source: Any) -> 'BoolType': + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, BoolType): + return source + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(int, source.get(StringType("value"))) + ) + else: + return super().__new__(cls, source) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({bool(self)})" + + def __str__(self) -> str: + return str(bool(self)) + + def __neg__(self) -> NoReturn: + raise TypeError("no such overload") + + def __hash__(self) -> int: + return super().__hash__() + + +class BytesType(bytes): + """Python's bytes semantics are close to CEL.""" + def __new__( + cls: Type['BytesType'], + source: Union[str, bytes, Iterable[int], 'BytesType', 'StringType'], + *args: Any, + **kwargs: Any + ) -> 'BytesType': + if source is None: + return super().__new__(cls, b'') + elif isinstance(source, (bytes, BytesType)): + return super().__new__(cls, source) + elif isinstance(source, (str, StringType)): + return super().__new__(cls, source.encode('utf-8')) + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(bytes, source.get(StringType("value"))) # type: ignore [attr-defined] + ) + elif isinstance(source, Iterable): + return super().__new__( + cls, + cast(Iterable[int], source) + ) + else: + raise TypeError(f"Invalid initial value type: {type(source)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + +class DoubleType(float): + """ + Native Python permits mixed type comparisons, doing conversions as needed. + + For CEL, we need to prevent mixed-type comparisons from working. + + TODO: Conversions from string? IntType? UintType? DoubleType? + """ + def __new__(cls: Type['DoubleType'], source: Any) -> 'DoubleType': + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(float, source.get(StringType("value"))) + ) + else: + return super().__new__(cls, source) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(float(self)) + return text + + def __neg__(self) -> 'DoubleType': + return DoubleType(super().__neg__()) + + def __mod__(self, other: Any) -> NoReturn: + raise TypeError( + f"found no matching overload for '_%_' applied to '(double, {type(other)})'") + + def __truediv__(self, other: Any) -> 'DoubleType': + if cast(float, other) == 0.0: + return DoubleType("inf") + else: + return DoubleType(super().__truediv__(other)) + + def __rmod__(self, other: Any) -> NoReturn: + raise TypeError( + f"found no matching overload for '_%_' applied to '({type(other)}, double)'") + + def __rtruediv__(self, other: Any) -> 'DoubleType': + if self == 0.0: + return DoubleType("inf") + else: + return DoubleType(super().__rtruediv__(other)) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +IntOperator = TypeVar('IntOperator', bound=Callable[..., int]) + + +def int64(operator: IntOperator) -> IntOperator: + """Apply an operation, but assure the value is within the int64 range.""" + @wraps(operator) + def clamped_operator(*args: Any, **kwargs: Any) -> int: + result: int = operator(*args, **kwargs) + if -(2**63) <= result < 2**63: + return result + raise ValueError("overflow") + return cast(IntOperator, clamped_operator) + + +class IntType(int): + """ + A version of int with overflow errors outside int64 range. + + features/integer_math.feature:277 "int64_overflow_positive" + + >>> IntType(9223372036854775807) + IntType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> 2**63 + 9223372036854775808 + + features/integer_math.feature:285 "int64_overflow_negative" + + >>> -IntType(9223372036854775808) - IntType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> IntType(DoubleType(1.9)) + IntType(2) + >>> IntType(DoubleType(-123.456)) + IntType(-123) + """ + def __new__( + cls: Type['IntType'], + source: Any, + *args: Any, + **kwargs: Any + ) -> 'IntType': + convert: Callable[..., int] + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, IntType): + return source + elif isinstance(source, MessageType): + # Used by protobuf. + return super().__new__( + cls, + cast(int, source.get(StringType("value"))) + ) + elif isinstance(source, (float, DoubleType)): + convert = int64(round) + elif isinstance(source, TimestampType): + convert = int64(lambda src: src.timestamp()) + elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}: + convert = int64(lambda src: int(src[2:], 16)) + elif isinstance(source, (str, StringType)) and source[:3] in {"-0x", "-0X"}: + convert = int64(lambda src: -int(src[3:], 16)) + else: + # Must tolerate "-" as part of the literal. + # See https://github.com/google/cel-spec/issues/126 + convert = int64(int) + return super().__new__(cls, convert(source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(int(self)) + return text + + @int64 + def __neg__(self) -> 'IntType': + return IntType(super().__neg__()) + + @int64 + def __add__(self, other: Any) -> 'IntType': + return IntType(super().__add__(cast(IntType, other))) + + @int64 + def __sub__(self, other: Any) -> 'IntType': + return IntType(super().__sub__(cast(IntType, other))) + + @int64 + def __mul__(self, other: Any) -> 'IntType': + return IntType(super().__mul__(cast(IntType, other))) + + @int64 + def __truediv__(self, other: Any) -> 'IntType': + other = cast(IntType, other) + self_sign = -1 if self < IntType(0) else +1 + other_sign = -1 if other < IntType(0) else +1 + go_div = self_sign * other_sign * (abs(self) // abs(other)) + return IntType(go_div) + + __floordiv__ = __truediv__ + + @int64 + def __mod__(self, other: Any) -> 'IntType': + self_sign = -1 if self < IntType(0) else +1 + go_mod = self_sign * (abs(self) % abs(cast(IntType, other))) + return IntType(go_mod) + + @int64 + def __radd__(self, other: Any) -> 'IntType': + return IntType(super().__radd__(cast(IntType, other))) + + @int64 + def __rsub__(self, other: Any) -> 'IntType': + return IntType(super().__rsub__(cast(IntType, other))) + + @int64 + def __rmul__(self, other: Any) -> 'IntType': + return IntType(super().__rmul__(cast(IntType, other))) + + @int64 + def __rtruediv__(self, other: Any) -> 'IntType': + other = cast(IntType, other) + self_sign = -1 if self < IntType(0) else +1 + other_sign = -1 if other < IntType(0) else +1 + go_div = self_sign * other_sign * (abs(other) // abs(self)) + return IntType(go_div) + + __rfloordiv__ = __rtruediv__ + + @int64 + def __rmod__(self, other: Any) -> 'IntType': + left_sign = -1 if other < IntType(0) else +1 + go_mod = left_sign * (abs(other) % abs(self)) + return IntType(go_mod) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + @type_matched + def __lt__(self, other: Any) -> bool: + return super().__lt__(other) + + @type_matched + def __le__(self, other: Any) -> bool: + return super().__le__(other) + + @type_matched + def __gt__(self, other: Any) -> bool: + return super().__gt__(other) + + @type_matched + def __ge__(self, other: Any) -> bool: + return super().__ge__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +def uint64(operator: IntOperator) -> IntOperator: + """Apply an operation, but assure the value is within the uint64 range.""" + @wraps(operator) + def clamped_operator(*args: Any, **kwargs: Any) -> int: + result = operator(*args, **kwargs) + if 0 <= result < 2**64: + return result + raise ValueError("overflow") + return cast(IntOperator, clamped_operator) + + +class UintType(int): + """ + A version of int with overflow errors outside uint64 range. + + Alternatives: + + Option 1 - Use https://pypi.org/project/fixedint/ + + Option 2 - use array or struct modules to access an unsigned object. + + Test Cases: + + features/integer_math.feature:149 "unary_minus_no_overload" + + >>> -UintType(42) + Traceback (most recent call last): + ... + TypeError: no such overload + + uint64_overflow_positive + + >>> UintType(18446744073709551615) + UintType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + uint64_overflow_negative + + >>> UintType(0) - UintType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> - UintType(5) + Traceback (most recent call last): + ... + TypeError: no such overload + """ + def __new__( + cls: Type['UintType'], + source: Any, + *args: Any, + **kwargs: Any + ) -> 'UintType': + convert: Callable[..., int] + if isinstance(source, UintType): + return source + elif isinstance(source, (float, DoubleType)): + convert = uint64(round) + elif isinstance(source, TimestampType): + convert = uint64(lambda src: src.timestamp()) + elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}: + convert = uint64(lambda src: int(src[2:], 16)) + elif isinstance(source, MessageType): + # Used by protobuf. + convert = uint64(lambda src: src['value'] if src['value'] is not None else 0) + elif source is None: + convert = uint64(lambda src: 0) + else: + convert = uint64(int) + return super().__new__(cls, convert(source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(int(self)) + return text + + def __neg__(self) -> NoReturn: + raise TypeError("no such overload") + + @uint64 + def __add__(self, other: Any) -> 'UintType': + return UintType(super().__add__(cast(IntType, other))) + + @uint64 + def __sub__(self, other: Any) -> 'UintType': + return UintType(super().__sub__(cast(IntType, other))) + + @uint64 + def __mul__(self, other: Any) -> 'UintType': + return UintType(super().__mul__(cast(IntType, other))) + + @uint64 + def __truediv__(self, other: Any) -> 'UintType': + return UintType(super().__floordiv__(cast(IntType, other))) + + __floordiv__ = __truediv__ + + @uint64 + def __mod__(self, other: Any) -> 'UintType': + return UintType(super().__mod__(cast(IntType, other))) + + @uint64 + def __radd__(self, other: Any) -> 'UintType': + return UintType(super().__radd__(cast(IntType, other))) + + @uint64 + def __rsub__(self, other: Any) -> 'UintType': + return UintType(super().__rsub__(cast(IntType, other))) + + @uint64 + def __rmul__(self, other: Any) -> 'UintType': + return UintType(super().__rmul__(cast(IntType, other))) + + @uint64 + def __rtruediv__(self, other: Any) -> 'UintType': + return UintType(super().__rfloordiv__(cast(IntType, other))) + + __rfloordiv__ = __rtruediv__ + + @uint64 + def __rmod__(self, other: Any) -> 'UintType': + return UintType(super().__rmod__(cast(IntType, other))) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +class ListType(List[Value]): + """ + Native Python implements comparison operations between list objects. + + For CEL, we prevent list comparison operators from working. + + We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that + gracefully ignore type mismatch problems, calling them not equal. + + See https://github.com/google/cel-spec/issues/127 + + An implied logical And means a singleton behaves in a distinct way from a non-singleton list. + """ + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __lt__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __le__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __gt__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __ge__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, (list, ListType)): + raise TypeError(f"no such overload: ListType == {type(other)}") + + def equal(s: Any, o: Any) -> Value: + try: + return BoolType(s == o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + result = ( + len(self) == len(other) + and reduce( # noqa: W503 + logical_and, # type: ignore [arg-type] + (equal(item_s, item_o) for item_s, item_o in zip(self, other)), + BoolType(True) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + def __ne__(self, other: Any) -> bool: + if not isinstance(other, (list, ListType)): + raise TypeError(f"no such overload: ListType != {type(other)}") + + def not_equal(s: Any, o: Any) -> Value: + try: + return BoolType(s != o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + result = ( + len(self) != len(other) + or reduce( # noqa: W503 + logical_or, # type: ignore [arg-type] + (not_equal(item_s, item_o) for item_s, item_o in zip(self, other)), + BoolType(False) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + +BaseMapTypes = Union[ + Mapping[Any, Any], + Sequence[Tuple[Any, Any]], + None +] + + +MapKeyTypes = Union[ + 'IntType', 'UintType', 'BoolType', 'StringType', str +] + + +class MapType(Dict[Value, Value]): + """ + Native Python allows mapping updates and any hashable type as a kay. + + CEL prevents mapping updates and has a limited domain of key types. + int, uint, bool, or string keys + + We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that + gracefully ignore type mismatch problems for the values, calling them not equal. + + See https://github.com/google/cel-spec/issues/127 + + An implied logical And means a singleton behaves in a distinct way from a non-singleton mapping. + """ + def __init__( + self, + items: BaseMapTypes = None) -> None: + super().__init__() + if items is None: + pass + elif isinstance(items, Sequence): + for name, value in items: + self[name] = value + elif isinstance(items, Mapping): + for name, value in items.items(): + self[name] = value + else: + raise TypeError(f"Invalid initial value type: {type(items)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __getitem__(self, key: Any) -> Any: + if not MapType.valid_key_type(key): + raise TypeError(f"unsupported key type: {type(key)}") + return super().__getitem__(key) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, (Mapping, MapType)): + raise TypeError(f"no such overload: MapType == {type(other)}") + + def equal(s: Any, o: Any) -> BoolType: + try: + return BoolType(s == o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + keys_s = self.keys() + keys_o = other.keys() + result = ( + keys_s == keys_o + and reduce( # noqa: W503 + logical_and, # type: ignore [arg-type] + (equal(self[k], other[k]) for k in keys_s), + BoolType(True) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + def __ne__(self, other: Any) -> bool: + if not isinstance(other, (Mapping, MapType)): + raise TypeError(f"no such overload: MapType != {type(other)}") + + # Singleton special case, may return no-such overload. + if len(self) == 1 and len(other) == 1 and self.keys() == other.keys(): + k = next(iter(self.keys())) + return cast(bool, self[k] != other[k]) # Instead of Union[BoolType, TypeError] + + def not_equal(s: Any, o: Any) -> BoolType: + try: + return BoolType(s != o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + keys_s = self.keys() + keys_o = other.keys() + result = ( + keys_s != keys_o + or reduce( # noqa: W503 + logical_or, # type: ignore [arg-type] + (not_equal(self[k], other[k]) for k in keys_s), + BoolType(False) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + @staticmethod + def valid_key_type(key: Any) -> bool: + """Valid CEL key types. Plus native str for tokens in the source when evaluating ``e.f``""" + return isinstance(key, (IntType, UintType, BoolType, StringType, str)) + + +class NullType: + """TBD. May not be needed. Python's None semantics appear to match CEL perfectly.""" + pass + + +class StringType(str): + """Python's str semantics are very, very close to CEL. + + We rely on the overlap between ``"/u270c"`` and ``"/U0001f431"`` in CEL and Python. + """ + def __new__( + cls: Type['StringType'], + source: Union[str, bytes, 'BytesType', 'StringType'], + *args: Any, + **kwargs: Any + ) -> 'StringType': + if isinstance(source, (bytes, BytesType)): + return super().__new__(cls, source.decode('utf')) + elif isinstance(source, (str, StringType)): + # TODO: Consider returning the original StringType object. + return super().__new__(cls, source) + else: + return cast(StringType, super().__new__(cls, source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +class TimestampType(datetime.datetime): + """ + Implements google.protobuf.Timestamp + + See https://developers.google.com/protocol-buffers/docs/reference/google.protobuf + + Also see https://www.ietf.org/rfc/rfc3339.txt. + + The protobuf implementation is an ordered pair of int64 seconds and int32 nanos. + + Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.datetime`. + + From protobuf documentation for making a Timestamp in Python:: + + now = time.time() + seconds = int(now) + nanos = int((now - seconds) * 10**9) + timestamp = Timestamp(seconds=seconds, nanos=nanos) + + Also:: + + >>> t = TimestampType("2009-02-13T23:31:30Z") + >>> repr(t) + "TimestampType('2009-02-13T23:31:30Z')" + >>> t.timestamp() + 1234567890.0 + >>> str(t) + '2009-02-13T23:31:30Z' + + :strong:`Timezones` + + Timezones are expressed in the following grammar: + + :: + + TimeZone = "UTC" | LongTZ | FixedTZ ; + LongTZ = ? list available at + http://joda-time.sourceforge.net/timezones.html ? ; + FixedTZ = ( "+" | "-" ) Digit Digit ":" Digit Digit ; + Digit = "0" | "1" | ... | "9" ; + + Fixed timezones are explicit hour and minute offsets from UTC. + Long timezone names are like Europe/Paris, CET, or US/Central. + + The Joda project (https://www.joda.org/joda-time/timezones.html) + says "Time zone data is provided by the public IANA time zone database." + + The ``dateutil`` project (https://pypi.org/project/python-dateutil/) + is used for TZ handling and timestamp parsing. + + Additionally, there is a ``TZ_ALIASES`` mapping available in this class to permit additional + timezone names. By default, the mapping is empty, and the only names + available are those recognized by :mod:`dateutil.tz`. + """ + + TZ_ALIASES: Dict[str, str] = {} + + def __new__( + cls: Type['TimestampType'], + source: Union[int, str, datetime.datetime], + *args: Any, + **kwargs: Any) -> 'TimestampType': + + if isinstance(source, datetime.datetime): + # Wrap a datetime.datetime + return super().__new__( + cls, + year=source.year, + month=source.month, + day=source.day, + hour=source.hour, + minute=source.minute, + second=source.second, + microsecond=source.microsecond, + tzinfo=source.tzinfo or datetime.timezone.utc + ) + + elif isinstance(source, int) and len(args) >= 2: + # Wrap a sequence of integers that datetime.datetime might accept. + ts: TimestampType = super().__new__( + cls, source, *args, **kwargs + ) + if not ts.tzinfo: + ts = ts.replace(tzinfo=datetime.timezone.utc) + return ts + + elif isinstance(source, str): + # Use dateutil to try a variety of text formats. + parsed_datetime = dateutil.parser.isoparse(source) + return super().__new__( + cls, + year=parsed_datetime.year, + month=parsed_datetime.month, + day=parsed_datetime.day, + hour=parsed_datetime.hour, + minute=parsed_datetime.minute, + second=parsed_datetime.second, + microsecond=parsed_datetime.microsecond, + tzinfo=parsed_datetime.tzinfo + ) + + else: + raise TypeError(f"Cannot create {cls} from {source!r}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self)!r})" + + def __str__(self) -> str: + return self.strftime("%Y-%m-%dT%H:%M:%SZ") + + def __add__(self, other: Any) -> 'TimestampType': + """Timestamp + Duration -> Timestamp""" + result = super().__add__(other) + if result == NotImplemented: + return NotImplemented + return TimestampType(result) + + def __radd__(self, other: Any) -> 'TimestampType': + """Duration + Timestamp -> Timestamp""" + result = super().__radd__(other) + if result == NotImplemented: + return NotImplemented + return TimestampType(result) + + # For more information, check the typeshed definition + # https://github.com/python/typeshed/blob/master/stdlib/2and3/datetime.pyi + + @overload # type: ignore + def __sub__(self, other: 'TimestampType') -> 'DurationType': + ... # pragma: no cover + + @overload + def __sub__(self, other: 'DurationType') -> 'TimestampType': + ... # pragma: no cover + + def __sub__( + self, + other: Union['TimestampType', 'DurationType'] + ) -> Union['TimestampType', 'DurationType']: + result = super().__sub__(other) + if result == NotImplemented: + return cast(DurationType, result) + if isinstance(result, datetime.timedelta): + return DurationType(result) + return TimestampType(result) + + @classmethod + def tz_name_lookup(cls, tz_name: str) -> Optional[datetime.tzinfo]: + """ + The :py:func:`dateutil.tz.gettz` may be extended with additional aliases. + + .. TODO: Permit an extension into the timezone lookup. + Tweak ``celpy.celtypes.TimestampType.TZ_ALIASES``. + """ + tz_lookup = str(tz_name) + if tz_lookup in cls.TZ_ALIASES: + tz = dateutil.tz.gettz(cls.TZ_ALIASES[tz_lookup]) + else: + tz = dateutil.tz.gettz(tz_lookup) + return tz + + @classmethod + def tz_offset_parse(cls, tz_name: str) -> Optional[datetime.tzinfo]: + tz_pat = re.compile(r"^([+-]?)(\d\d?):(\d\d)$") + tz_match = tz_pat.match(tz_name) + if not tz_match: + raise ValueError(f"Unparsable timezone: {tz_name!r}") + sign, hh, mm = tz_match.groups() + offset_min = (int(hh) * 60 + int(mm)) * (-1 if sign == '-' else +1) + offset = datetime.timedelta(seconds=offset_min * 60) + tz = datetime.timezone(offset) + return tz + + @staticmethod + def tz_parse(tz_name: Optional[str]) -> Optional[datetime.tzinfo]: + if tz_name: + tz = TimestampType.tz_name_lookup(tz_name) + if tz is None: + tz = TimestampType.tz_offset_parse(tz_name) + return tz + else: + return dateutil.tz.UTC + + def getDate(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).day) + + def getDayOfMonth(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).day - 1) + + def getDayOfWeek(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).isoweekday() % 7) + + def getDayOfYear(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + working_date = self.astimezone(new_tz) + jan1 = datetime.datetime(working_date.year, 1, 1, tzinfo=new_tz) + days = working_date.toordinal() - jan1.toordinal() + return IntType(days) + + def getMonth(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).month - 1) + + def getFullYear(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).year) + + def getHours(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).hour) + + def getMilliseconds(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).microsecond // 1000) + + def getMinutes(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).minute) + + def getSeconds(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).second) + + +class DurationType(datetime.timedelta): + """ + Implements google.protobuf.Duration + + https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration + + The protobuf implementation is an ordered pair of int64 seconds and int32 nanos. + Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.timedelta`. + + The definition once said this:: + + "type conversion, duration should be end with "s", which stands for seconds" + + This is obsolete, however, considering the following issue. + + See https://github.com/google/cel-spec/issues/138 + + This refers to the following implementation detail + :: + + // A duration string is a possibly signed sequence of + // decimal numbers, each with optional fraction and a unit suffix, + // such as "300ms", "-1.5h" or "2h45m". + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + + The real regex, then is this:: + + [-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+ + + """ + MaxSeconds = 315576000000 + MinSeconds = -315576000000 + NanosecondsPerSecond = 1000000000 + + scale: Dict[str, float] = { + "ns": 1E-9, + "us": 1E-6, + "µs": 1E-6, + "ms": 1E-3, + "s": 1., + "m": 60., + "h": 60. * 60., + "d": 24. * 60. * 60., + } + + def __new__( + cls: Type['DurationType'], + seconds: Any, + nanos: int = 0, + **kwargs: Any) -> 'DurationType': + if isinstance(seconds, datetime.timedelta): + if not (cls.MinSeconds <= seconds.total_seconds() <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, days=seconds.days, seconds=seconds.seconds, microseconds=seconds.microseconds) + elif isinstance(seconds, int): + if not (cls.MinSeconds <= seconds <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, seconds=seconds, microseconds=nanos // 1000) + elif isinstance(seconds, str): + duration_pat = re.compile(r"^[-+]?([0-9]*(\.[0-9]*)?[a-z]+)+$") + + duration_match = duration_pat.match(seconds) + if not duration_match: + raise ValueError(f"Invalid duration {seconds!r}") + + # Consume the sign. + sign: float + if seconds.startswith("+"): + seconds = seconds[1:] + sign = +1 + elif seconds.startswith("-"): + seconds = seconds[1:] + sign = -1 + else: + sign = +1 + + # Sum the remaining time components: number * unit + try: + seconds = sign * fsum( + map( + lambda n_u: float(n_u.group(1)) * cls.scale[n_u.group(3)], + re.finditer(r"([0-9]*(\.[0-9]*)?)([a-z]+)", seconds) + ) + ) + except KeyError: + raise ValueError(f"Invalid duration {seconds!r}") + + if not (cls.MinSeconds <= seconds <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, seconds=seconds) + else: + raise TypeError(f"Invalid initial value type: {type(seconds)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self)!r})" + + def __str__(self) -> str: + return "{0}s".format(int(self.total_seconds())) + + def __add__(self, other: Any) -> 'DurationType': + """ + This doesn't need to handle the rich variety of TimestampType overloadds. + This class only needs to handle results of duration + duration. + A duration + timestamp is not implemented by the timedelta superclass; + it is handled by the datetime superclass that implementes timestamp + duration. + """ + result = super().__add__(other) + if result == NotImplemented: + return cast(DurationType, result) + # This is handled by TimestampType; this is here for completeness, but isn't used. + if isinstance(result, (datetime.datetime, TimestampType)): + return TimestampType(result) # pragma: no cover + return DurationType(result) + + def __radd__(self, other: Any) -> 'DurationType': # pragma: no cover + """ + This doesn't need to handle the rich variety of TimestampType overloadds. + + Most cases are handled by TimeStamp. + """ + result = super().__radd__(other) + if result == NotImplemented: + return cast(DurationType, result) + # This is handled by TimestampType; this is here for completeness, but isn't used. + if isinstance(result, (datetime.datetime, TimestampType)): + return TimestampType(result) + return DurationType(result) + + def getHours(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() / 60 / 60)) + + def getMilliseconds(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() * 1000)) + + def getMinutes(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() / 60)) + + def getSeconds(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds())) + + +class FunctionType: + """ + We need a concrete Annotation object to describe callables to celpy. + We need to describe functions as well as callable objects. + The description would tend to shadow ``typing.Callable``. + + An ``__isinstance__()`` method, for example, may be helpful for run-time type-checking. + + Superclass for CEL extension functions that are defined at run-time. + This permits a formal annotation in the environment construction that creates + an intended type for a given name. + + This allows for some run-time type checking to see if the actual object binding + matches the declared type binding. + + Also used to define protobuf classes provided as an annotation. + + We *could* define this as three overloads to cover unary, binary, and tertiary cases. + """ + def __call__(self, *args: Value, **kwargs: Value) -> Value: + raise NotImplementedError + + +class PackageType(MapType): + """ + A package of message types, usually protobuf. + + TODO: This may not be needed. + """ + pass + + +class MessageType(MapType): + """ + An individual protobuf message definition. A mapping from field name to field value. + + See Scenario: "message_literal" in the parse.feature. This is a very deeply-nested + message (30? levels), but the navigation to "payload" field seems to create a default + value at the top level. + """ + def __init__(self, *args: Value, **fields: Value) -> None: + if args and len(args) == 1: + super().__init__(cast(Mapping[Value, Value], args[0])) + elif args and len(args) > 1: + raise TypeError(r"Expected dictionary or fields, not {args!r}") + else: + super().__init__({StringType(k): v for k, v in fields.items()}) + + # def get(self, field: Any, default: Optional[Value] = None) -> Value: + # """ + # Alternative implementation with descent to locate a deeply-buried field. + # It seemed like this was the defined behavior. It turns it, it isn't. + # The code is here in case we're wrong and it really is the defined behavior. + # + # Note. There is no default provision in CEL. + # """ + # if field in self: + # return super().get(field) + # + # def descend(message: MessageType, field: Value) -> MessageType: + # if field in message: + # return message + # for k in message.keys(): + # found = descend(message[k], field) + # if found is not None: + # return found + # return None + # + # sub_message = descend(self, field) + # if sub_message is None: + # return default + # return sub_message.get(field) + + +class TypeType: + """ + Annotation used to mark protobuf type objects. + We map these to CELTypes so that type name testing works. + """ + type_name_mapping = { + "google.protobuf.Duration": DurationType, + "google.protobuf.Timestamp": TimestampType, + "google.protobuf.Int32Value": IntType, + "google.protobuf.Int64Value": IntType, + "google.protobuf.UInt32Value": UintType, + "google.protobuf.UInt64Value": UintType, + "google.protobuf.FloatValue": DoubleType, + "google.protobuf.DoubleValue": DoubleType, + "google.protobuf.Value": MessageType, + "google.protubuf.Any": MessageType, # Weird. + "google.protobuf.Any": MessageType, + "list_type": ListType, + "map_type": MapType, + "map": MapType, + "list": ListType, + "string": StringType, + "bytes": BytesType, + "bool": BoolType, + "int": IntType, + "uint": UintType, + "double": DoubleType, + "null_type": type(None), + "STRING": StringType, + "BOOL": BoolType, + "INT64": IntType, + "UINT64": UintType, + "INT32": IntType, + "UINT32": UintType, + "BYTES": BytesType, + "DOUBLE": DoubleType, + } + + def __init__( + self, + value: Any = "") -> None: + if isinstance(value, str) and value in self.type_name_mapping: + self.type_reference = self.type_name_mapping[value] + elif isinstance(value, str): + try: + self.type_reference = eval(value) + except (NameError, SyntaxError): + raise TypeError(f"Unknown type {value!r}") + else: + self.type_reference = value.__class__ + + def __eq__(self, other: Any) -> bool: + return ( + other == self.type_reference + or isinstance(other, self.type_reference) # noqa: W503 + ) diff --git a/.venv/lib/python3.12/site-packages/celpy/evaluation.py b/.venv/lib/python3.12/site-packages/celpy/evaluation.py new file mode 100644 index 00000000..5e97e3b3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/evaluation.py @@ -0,0 +1,2446 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +CEL Interpreter using the AST directly. + +The general idea is to map CEL operators to Python operators and push the +real work off to Python objects defined by the :py:mod:`celpy.celtypes` module. + +CEL operator "+" is implemented by "_+_" function. We map this to :py:func:`operator.add`. +This will then look for `__add__()` methods in the various :py:class:`celpy.celtypes.CELType` +types. + +In order to deal gracefully with missing and incomplete data, +exceptions are turned into first-class :py:class:`Result` objects. +They're not raised directly, but instead saved as part of the evaluation so that +short-circuit operators can ignore the exceptions. + +This means that Python exceptions like :exc:`TypeError`, :exc:`IndexError`, and :exc:`KeyError` +are caught and transformed into :exc:`CELEvalError` objects. + +The :py:class:`Resut` type hint is a union of the various values that are encountered +during evaluation. It's a union of the :py:class:`celpy.celtypes.CELTypes` type and the +:exc:`CELEvalError` exception. +""" +import collections +import logging +import operator +import re +import sys +from functools import reduce, wraps +from typing import (Any, Callable, Dict, Iterable, Iterator, List, Mapping, + Match, Optional, Sequence, Sized, Tuple, Type, TypeVar, + Union, cast) + +import lark +import lark.visitors + +import celpy.celtypes +from celpy.celparser import tree_dump + +# A CEL type annotation. Used in an environment to describe objects as well as functions. +# This is a list of types, plus Callable for conversion functions. +Annotation = Union[ + celpy.celtypes.CELType, + Callable[..., celpy.celtypes.Value], # Conversion functions and protobuf message type + Type[celpy.celtypes.FunctionType], # Concrete class for annotations +] + + +logger = logging.getLogger("evaluation") + + +class CELSyntaxError(Exception): + """CEL Syntax error -- the AST did not have the expected structure.""" + def __init__(self, arg: Any, line: Optional[int] = None, column: Optional[int] = None) -> None: + super().__init__(arg) + self.line = line + self.column = column + + +class CELUnsupportedError(Exception): + """Feature unsupported by this implementation of CEL.""" + def __init__(self, arg: Any, line: int, column: int) -> None: + super().__init__(arg) + self.line = line + self.column = column + + +class CELEvalError(Exception): + """CEL evaluation problem. This can be saved as a temporary value for later use. + This is politely ignored by logic operators to provide commutative short-circuit. + + We provide operator-like special methods so an instance of an error + returns itself when operated on. + """ + def __init__( + self, + *args: Any, + tree: Optional[lark.Tree] = None, + token: Optional[lark.Token] = None) -> None: + super().__init__(*args) + self.tree = tree + self.token = token + self.line: Optional[int] = None + self.column: Optional[int] = None + if self.tree: + self.line = self.tree.meta.line + self.column = self.tree.meta.column + if self.token: + self.line = self.token.line + self.column = self.token.column + + def __repr__(self) -> str: + cls = self.__class__.__name__ + if self.tree and self.token: + # This is rare + return ( + f"{cls}(*{self.args}, tree={tree_dump(self.tree)!r}, token={self.token!r})" + ) # pragma: no cover + elif self.tree: + return f"{cls}(*{self.args}, tree={tree_dump(self.tree)!r})" # pragma: no cover + else: + # Some unit tests do not provide a mock tree. + return f"{cls}(*{self.args})" # pragma: no cover + + def with_traceback(self, tb: Any) -> 'CELEvalError': + return super().with_traceback(tb) + + def __neg__(self) -> 'CELEvalError': + return self + + def __add__(self, other: Any) -> 'CELEvalError': + return self + + def __sub__(self, other: Any) -> 'CELEvalError': + return self + + def __mul__(self, other: Any) -> 'CELEvalError': + return self + + def __truediv__(self, other: Any) -> 'CELEvalError': + return self + + def __floordiv__(self, other: Any) -> 'CELEvalError': + return self + + def __mod__(self, other: Any) -> 'CELEvalError': + return self + + def __pow__(self, other: Any) -> 'CELEvalError': + return self + + def __radd__(self, other: Any) -> 'CELEvalError': + return self + + def __rsub__(self, other: Any) -> 'CELEvalError': + return self + + def __rmul__(self, other: Any) -> 'CELEvalError': + return self + + def __rtruediv__(self, other: Any) -> 'CELEvalError': + return self + + def __rfloordiv__(self, other: Any) -> 'CELEvalError': + return self + + def __rmod__(self, other: Any) -> 'CELEvalError': + return self + + def __rpow__(self, other: Any) -> 'CELEvalError': + return self + + def __eq__(self, other: Any) -> bool: + if isinstance(other, CELEvalError): + return self.args == other.args + return NotImplemented + + def __call__(self, *args: Any) -> 'CELEvalError': + return self + + +# The interim results extends celtypes to include itermediate CELEvalError exception objects. +# These can be deferred as part of commutative logical_and and logical_or operations. +# It includes the responses to type() queries, also. +Result = Union[ + celpy.celtypes.Value, + CELEvalError, + celpy.celtypes.CELType, +] + +# The various functions that apply to CEL data. +# The evaluator's functions expand on the CELTypes to include CELEvalError and the +# celpy.celtypes.CELType union type, also. +CELFunction = Callable[..., Result] + +# A combination of a CELType result or a function resulting from identifier evaluation. +Result_Function = Union[ + Result, + CELFunction, +] + +Exception_Filter = Union[Type[BaseException], Sequence[Type[BaseException]]] + +TargetFunc = TypeVar('TargetFunc', bound=CELFunction) + + +def eval_error(new_text: str, exc_class: Exception_Filter) -> Callable[[TargetFunc], TargetFunc]: + """ + Wrap a function to transform native Python exceptions to CEL CELEvalError values. + Any exception of the given class is replaced with the new CELEvalError object. + + :param new_text: Text of the exception, e.g., "divide by zero", "no such overload") + this is the return value if the :exc:`CELEvalError` becomes the result. + :param exc_class: A Python exception class to match, e.g. ZeroDivisionError, + or a sequence of exception classes (e.g. (ZeroDivisionError, ValueError)) + :return: A decorator that can be applied to a function + to map Python exceptions to :exc:`CELEvalError` instances. + + This is used in the ``all()`` and ``exists()`` macros to silently ignore TypeError exceptions. + """ + def concrete_decorator(function: TargetFunc) -> TargetFunc: + @wraps(function) + def new_function(*args: celpy.celtypes.Value, **kwargs: celpy.celtypes.Value) -> Result: + try: + return function(*args, **kwargs) + except exc_class as ex: # type: ignore[misc] + logger.debug("%s(*%s, **%s) --> %s", function.__name__, args, kwargs, ex) + _, _, tb = sys.exc_info() + value = CELEvalError(new_text, ex.__class__, ex.args).with_traceback(tb) + value.__cause__ = ex + return value + except Exception: + logger.error("%s(*%s, **%s)", function.__name__, args, kwargs) + raise + return cast(TargetFunc, new_function) + return concrete_decorator + + +def boolean( + function: Callable[..., celpy.celtypes.Value]) -> Callable[..., celpy.celtypes.BoolType]: + """ + Wraps boolean operators to create CEL BoolType results. + + :param function: One of the operator.lt, operator.gt, etc. comparison functions + :return: Decorated function with type coercion. + """ + @wraps(function) + def bool_function(a: celpy.celtypes.Value, b: celpy.celtypes.Value) -> celpy.celtypes.BoolType: + result = function(a, b) + if result == NotImplemented: + return cast(celpy.celtypes.BoolType, result) + return celpy.celtypes.BoolType(bool(result)) + return bool_function + + +def operator_in(item: Result, container: Result) -> Result: + """ + CEL contains test; ignores type errors. + + During evaluation of ``'elem' in [1, 'elem', 2]``, + CEL will raise internal exceptions for ``'elem' == 1`` and ``'elem' == 2``. + The :exc:`TypeError` exceptions are gracefully ignored. + + During evaluation of ``'elem' in [1u, 'str', 2, b'bytes']``, however, + CEL will raise internal exceptions every step of the way, and an exception + value is the final result. (Not ``False`` from the one non-exceptional comparison.) + + It would be nice to make use of the following:: + + eq_test = eval_error("no such overload", TypeError)(lambda x, y: x == y) + + It seems like ``next(iter(filter(lambda x: eq_test(c, x) for c in container))))`` + would do it. But. It's not quite right for the job. + + There need to be three results, something :py:func:`filter` doesn't handle. + These are the chocies: + + - True. There was a item found. Exceptions may or may not have been found. + - False. No item found AND no expceptions. + - CELEvalError. No item found AND at least one exception. + + To an extent this is a little like the ``exists()`` macro. + We can think of ``container.contains(item)`` as ``container.exists(r, r == item)``. + However, exists() tends to silence exceptions, where this can expost them. + + .. todo:: This may be better done as + + ``reduce(logical_or, (item == c for c in container), BoolType(False))`` + """ + result: Result = celpy.celtypes.BoolType(False) + for c in cast(Iterable[Result], container): + try: + if c == item: + return celpy.celtypes.BoolType(True) + except TypeError as ex: + logger.debug("operator_in(%s, %s) --> %s", item, container, ex) + result = CELEvalError("no such overload", ex.__class__, ex.args) + logger.debug("operator_in(%r, %r) = %r", item, container, result) + return result + + +def function_size(container: Result) -> Result: + """ + The size() function applied to a Value. Delegate to Python's :py:func:`len`. + + (string) -> int string length + (bytes) -> int bytes length + (list(A)) -> int list size + (map(A, B)) -> int map size + + For other types, this will raise a Python :exc:`TypeError`. + (This is captured and becomes an :exc:`CELEvalError` Result.) + + .. todo:: check container type for celpy.celtypes.StringType, celpy.celtypes.BytesType, + celpy.celtypes.ListType and celpy.celtypes.MapType + """ + if container is None: + return celpy.celtypes.IntType(0) + sized_container = cast(Sized, container) + result = celpy.celtypes.IntType(len(sized_container)) + logger.debug("function_size(%r) = %r", container, result) + return result + + +# User-defined functions can override items in this mapping. +base_functions: Mapping[str, CELFunction] = { + "!_": celpy.celtypes.logical_not, + "-_": operator.neg, + "_+_": operator.add, + "_-_": operator.sub, + "_*_": operator.mul, + "_/_": operator.truediv, + "_%_": operator.mod, + "_<_": boolean(operator.lt), + "_<=_": boolean(operator.le), + "_>=_": boolean(operator.ge), + "_>_": boolean(operator.gt), + "_==_": boolean(operator.eq), + "_!=_": boolean(operator.ne), + "_in_": operator_in, + "_||_": celpy.celtypes.logical_or, + "_&&_": celpy.celtypes.logical_and, + "_?_:_": celpy.celtypes.logical_condition, + "_[_]": operator.getitem, + "size": function_size, + # StringType methods + "endsWith": lambda s, text: celpy.celtypes.BoolType(s.endswith(text)), + "startsWith": lambda s, text: celpy.celtypes.BoolType(s.startswith(text)), + "matches": lambda s, pattern: celpy.celtypes.BoolType(re.search(pattern, s) is not None), + "contains": lambda s, text: celpy.celtypes.BoolType(text in s), + # TimestampType methods. Type details are redundant, but required because of the lambdas + "getDate": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDate(tz_name)), + "getDayOfMonth": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfMonth(tz_name)), + "getDayOfWeek": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfWeek(tz_name)), + "getDayOfYear": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfYear(tz_name)), + "getFullYear": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getFullYear(tz_name)), + "getMonth": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMonth(tz_name)), + # TimestampType and DurationType methods + "getHours": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getHours(tz_name)), + "getMilliseconds": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMilliseconds(tz_name)), + "getMinutes": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMinutes(tz_name)), + "getSeconds": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getSeconds(tz_name)), + # type conversion functions + "bool": celpy.celtypes.BoolType, + "bytes": celpy.celtypes.BytesType, + "double": celpy.celtypes.DoubleType, + "duration": celpy.celtypes.DurationType, + "int": celpy.celtypes.IntType, + "list": celpy.celtypes.ListType, # https://github.com/google/cel-spec/issues/123 + "map": celpy.celtypes.MapType, + "null_type": type(None), + "string": celpy.celtypes.StringType, + "timestamp": celpy.celtypes.TimestampType, + "uint": celpy.celtypes.UintType, + "type": type, +} + + +class Referent: + """ + A Name can refer to any of the following things: + + - Annotations -- initially most names are these + or a CELFunction that may implement a type. + Must be provided as part of the initialization. + + - NameContainer -- some names are these. This is true + when the name is *not* provided as part of the initialization because + we discovered the name during type or environment binding. + + - celpy.celtypes.Value -- many annotations also have values. + These are provided **after** Annotations, and require them. + + - CELEvalError -- This seems unlikely, but we include it because it's possible. + + - Functions -- All of the type conversion functions are names in a NameContainer. + + A name can be ambiguous and refer to both a nested ``NameContainer`` as well + as a ``celpy.celtypes.Value`` (usually a MapType instance.) + + Object ``b`` has two possible meanings: + + - ``b.c`` is a NameContainer for ``c``, a string. + + - ``b`` is a mapping, and ``b.c`` is syntax sugar for ``b['c']``. + + The "longest name" rule means that the useful value is the "c" object + in the nested ``NameContainer``. + The syntax sugar interpretation is done in the rare case we can't find the ``NameContainer``. + + >>> nc = NameContainer("c", celpy.celtypes.StringType) + >>> b = Referent(celpy.celtypes.MapType) + >>> b.value = celpy.celtypes.MapType({"c": "oops"}) + >>> b.value == celpy.celtypes.MapType({"c": "oops"}) + True + >>> b.container = nc + >>> b.value == nc + True + + In effect, this class is + :: + + Referent = Union[ + Annotation, + celpy.celtypes.Value, + CELEvalError, + CELFunction, + ] + """ + def __init__( + self, + ref_to: Optional[Annotation] = None + # Union[ + # None, Annotation, celpy.celtypes.Value, CELEvalError, + # CELFunction, 'NameContainer' + # ] = None + ) -> None: + self.annotation: Optional[Annotation] = None + self.container: Optional['NameContainer'] = None + self._value: Union[ + None, Annotation, celpy.celtypes.Value, CELEvalError, CELFunction, + 'NameContainer'] = None + self._value_set = False + if ref_to: + self.annotation = ref_to + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}(annotation={self.annotation!r}, " + f"container={self.container!r}, " + f"_value={self._value!r})" + ) + + @property + def value(self) -> Union[ + Annotation, celpy.celtypes.Value, CELEvalError, CELFunction, 'NameContainer']: + """ + The longest-path rule means we prefer ``NameContainer`` over any locally defined value. + Otherwise, we'll provide a value if there is one. + Finally, we'll provide the annotation if there's no value. + :return: + """ + if self.container is not None: + return self.container + elif self._value_set: + return self._value + else: + # Not part of a namespace path. Nor was a value set. + return self.annotation + + @value.setter + def value( + self, + ref_to: Union[ + Annotation, celpy.celtypes.Value, CELEvalError, CELFunction, 'NameContainer'] + ) -> None: + self._value = ref_to + self._value_set = True + + def clone(self) -> "Referent": + new = Referent(self.annotation) + new.container = self.container + new._value = self._value + new._value_set = self._value_set + return new + + +# A name resolution context is a mapping from an identifer to a Value or a ``NameContainer``. +# This reflects some murkiness in the name resolution algorithm that needs to be cleaned up. +Context = Mapping[str, Union[Result, "NameContainer"]] + + +# Copied from cel.lark +IDENT = r"[_a-zA-Z][_a-zA-Z0-9]*" + + +class NameContainer(Dict[str, Referent]): + """ + A namespace that fulfills the CEL name resolution requirement. + + :: + + Scenario: "qualified_identifier_resolution_unchecked" + "namespace resolution should try to find the longest prefix for the evaluator." + + NameContainer instances can be chained (via parent) to create a sequence of searchable + locations for a name. + + - Local-most is an Activation with local variables within a macro. + These are part of a nested chain of Activations for each macro. Each local activation + is a child with a reference to the parent Activation. + + - Parent of any local Activation is the overall Activation for this CEL evaluation. + The overall Activation contains a number of NameContainers: + + - The global variable bindings. + + - Bindings of function definitions. This is the default set of functions for CEL + plus any add-on functions introduced by C7N. + + - The run-time annotations from the environment. There are two kinds: + + - Protobuf message definitions. These are types, really. + + - Annotations for global variables. The annotations tend to be hidden by the values. + They're in the lookup chain to simplify access to protobuf messages. + + - The environment also provides the built-in type names and aliases for the + :mod:`celtypes` package of built-in types. + + This means name resolution marches from local-most to remote-most, searching for a binding. + The global variable bindings have a local-most value and a more remote annotation. + The annotations (i.e. protobuf message types) have only a fairly remote annotation without + a value. + + Structure. + + A NameContainer is a mapping from names to Referents. + + A Referent can be one of three things. + + - A NameContainer further down the path + - An Annotation + - An Annotation with a value. + + Loading Names. + + There are several "phases" to building the chain of ``NameContainer`` instances. + + 1. The ``Activation`` creates the initial ``name : annotation`` bindings. + Generally, the names are type names, like "int", bound to :py:class:`celtypes.IntType`. + In some cases, the name is a future variable name, "resource", + bound to :py:class:`celtypes.MapType`. + + 2. The ``Activation`` creates a second ``NameContainer`` that has variable names. + This has a reference back to the parent to resolve names that are types. + + This involves decomposing the paths of names to make a tree of nested ``NameContainers``. + Upper-level containers don't (necessarily) have types or values -- they're merely + ``NameContainer`` along the path to the target names. + + Resolving Names. + + See https://github.com/google/cel-spec/blob/master/doc/langdef.md#name-resolution + + There are three cases required in the :py:class:`Evaluator` engine. + + - Variables and Functions. These are ``Result_Function`` instances: i.e., ordinary values. + + - ``Name.Name`` can be navigation into a protobuf package, when ``Name`` is protobuf package. + The idea is to locate the longest possible match. + + If a.b is a name to be resolved in the context of a protobuf declaration with scope A.B, + then resolution is attempted, in order, as A.B.a.b, A.a.b, and finally a.b. + To override this behavior, one can use .a.b; + this name will only be attempted to be resolved in the root scope, i.e. as a.b. + + - ``Name.Name`` can be syntactic sugar for indexing into a mapping when ``Name`` is a value of + ``MapType`` or a ``MessageType``. It's evaluated as if it was ``Name["Name"]``. + This is a fall-back plan if the previous resolution failed. + + The longest chain of nested packages *should* be resolved first. + This will happen when each name is a ``NameContainer`` object containing + other ``NameContainer`` objects. + + The chain of evaluations for ``IDENT . IDENT . IDENT`` is (in effect) + :: + + member_dot(member_dot(primary(IDENT), IDENT), IDENT) + + This makes the ``member_dot` processing left associative. + + The ``primary(IDENT)`` resolves to a CEL object of some kind. + Once the ``primary(IDENT)`` has been resolved, it establishes a context + for subsequent ``member_dot`` methods. + + - If this is a ``MapType`` or a ``MessageType`` with an object, + then ``member_dot`` will pluck out a field value and return this. + + - If this is a ``NameContainer`` or a ``PackageType`` then the ``member_dot`` + will pluck out a sub-package or ``EnumType`` or ``MessageType`` + and return the type object instead of a value. + At some point a ``member_object`` production will build an object from the type. + + The evaluator's :meth:`ident_value` method resolves the identifier into the ``Referent``. + + Acceptance Test Case + + We have two names + + - `a.b` -> NameContainer in which c = "yeah". (i.e., a.b.c : "yeah") + - `a.b` -> Mapping with {"c": "oops"}. + + This means any given name can have as many as three meanings: + + - Primarily as a NameContainer. This resolves name.name.name to find the longest + namespace possible. + + - Secondarily as a Mapping. This will be a fallback when name.name.name is really + syntactic sugar for name.name['name']. + + - Finally as a type annotation. + + """ + ident_pat = re.compile(IDENT) + extended_name_path = re.compile(f"^\\.?{IDENT}(?:\\.{IDENT})*$") + logger = logging.getLogger("NameContainer") + + def __init__( + self, + name: Optional[str] = None, + ref_to: Optional[Referent] = None, + parent: Optional['NameContainer'] = None + ) -> None: + if name and ref_to: + super().__init__({name: ref_to}) + else: + super().__init__() + self.parent: Optional[NameContainer] = parent + + def load_annotations( + self, + names: Mapping[str, Annotation], + ) -> None: + """ + Used by an ``Activation`` to build a container used to resolve + long path names into nested NameContainers. + Sets annotations for all supplied identifiers. + + ``{"name1.name2": annotation}`` becomes two things: + + 1. nc2 = NameContainer({"name2" : Referent(annotation)}) + + 2. nc1 = NameContainer({"name1" : Referent(nc2)}) + + :param names: A dictionary of {"name1.name1....": Referent, ...} items. + """ + for name, refers_to in names.items(): + self.logger.info("load_annotations %r : %r", name, refers_to) + if not self.extended_name_path.match(name): + raise ValueError(f"Invalid name {name}") + + context = self + + # Expand "name1.name2....": refers_to into ["name1", "name2", ...]: refers_to + *path, final = self.ident_pat.findall(name) + for name in path: + ref = context.setdefault(name, Referent()) + if ref.container is None: + ref.container = NameContainer(parent=self.parent) + context = ref.container + context.setdefault(final, Referent(refers_to)) + + def load_values(self, values: Context) -> None: + """Update annotations with actual values.""" + for name, refers_to in values.items(): + self.logger.info("load_values %r : %r", name, refers_to) + if not self.extended_name_path.match(name): + raise ValueError(f"Invalid name {name}") + + context = self + + # Expand "name1.name2....": refers_to into ["name1", "name2", ...]: refers_to + # Update NameContainer("name1", NameContainer("name2", NameContainer(..., refers_to))) + *path, final = self.ident_pat.findall(name) + for name in path: + ref = context.setdefault(name, Referent()) + if ref.container is None: + ref.container = NameContainer(parent=self.parent) + context = ref.container + context.setdefault(final, Referent()) # No annotation. + context[final].value = refers_to + + class NotFound(Exception): + """ + Raised locally when a name is not found in the middle of package search. + We can't return ``None`` from find_name because that's a valid value. + """ + pass + + @staticmethod + def dict_find_name(some_dict: Dict[str, Referent], path: List[str]) -> Result: + """ + Extension to navgiate into mappings, messages, and packages. + + :param some_dict: An instance of a MapType, MessageType, or PackageType. + :param path: names to follow into the structure. + :returns: Value found down inside the structure. + """ + if path: + head, *tail = path + try: + return NameContainer.dict_find_name( + cast(Dict[str, Referent], some_dict[head]), + tail) + except KeyError: + NameContainer.logger.debug("%r not found in %s", head, some_dict.keys()) + raise NameContainer.NotFound(path) + else: + return cast(Result, some_dict) + + def find_name(self, path: List[str]) -> Union["NameContainer", Result]: + """ + Find the name by searching down through nested packages or raise NotFound. + This is a kind of in-order tree walk of contained packages. + """ + if path: + head, *tail = path + try: + sub_context = self[head].value + except KeyError: + self.logger.debug("%r not found in %s", head, self.keys()) + raise NameContainer.NotFound(path) + if isinstance(sub_context, NameContainer): + return sub_context.find_name(tail) + elif isinstance( + sub_context, + (celpy.celtypes.MessageType, celpy.celtypes.MapType, + celpy.celtypes.PackageType, dict) + ): + # Out of defined NameContainers, moving into Values: Messages, Mappings or Packages + # Make a fake Referent return value. + item: Union["NameContainer", Result] = NameContainer.dict_find_name( + cast(Dict[str, Referent], sub_context), + tail + ) + return item + else: + # Fully matched. No more Referents with NameContainers or Referents with Mappings. + return cast(NameContainer, sub_context) + else: + # Fully matched. This NameContainer is what we were looking for. + return self + + def parent_iter(self) -> Iterator['NameContainer']: + """Yield this NameContainer and all of its parents to create a flat list.""" + yield self + if self.parent is not None: + yield from self.parent.parent_iter() + + def resolve_name( + self, + package: Optional[str], + name: str + ) -> Referent: + """ + Search with less and less package prefix until we find the thing. + + Resolution works as follows. + If a.b is a name to be resolved in the context of a protobuf declaration with scope A.B, + then resolution is attempted, in order, as + + 1. A.B.a.b. (Search for "a" in paackage "A.B"; the ".b" is handled separately.) + + 2. A.a.b. (Search for "a" in paackage "A"; the ".b" is handled separately.) + + 3. (finally) a.b. (Search for "a" in paackage None; the ".b" is handled separately.) + + To override this behavior, one can use .a.b; + this name will only be attempted to be resolved in the root scope, i.e. as a.b. + + We Start with the longest package name, a ``List[str]`` assigned to ``target``. + + Given a target, search through this ``NameContainer`` and all parents in the + :meth:`parent_iter` iterable. + The first name we find in the parent sequence is the goal. + This is because values are first, type annotations are laast. + + If we can't find the identifier with given package target, + truncate the package name from the end to create a new target and try again. + This is a bottom-up look that favors the longest name. + + :param package: Prefix string "name.name.name" + :param name: The variable we're looking for + :return: Name resolution as a Rereferent, often a value, but maybe a package or an + annotation. + """ + self.logger.info( + "resolve_name(%r.%r) in %s, parent=%s", package, name, self.keys, self.parent + ) + # Longest Name + if package: + target = self.ident_pat.findall(package) + [""] + else: + target = [""] + # Pool of matches + matches: List[Tuple[List[str], Union["NameContainer", Result]]] = [] + # Target has an extra item to make the len non-zero. + while not matches and target: + target = target[:-1] + for p in self.parent_iter(): + try: + package_ident: List[str] = target + [name] + match: Union["NameContainer", Result] = p.find_name(package_ident) + matches.append((package_ident, match)) + except NameContainer.NotFound: + # No matches; move to the parent and try again. + pass + self.logger.debug("resolve_name: target=%s+[%r], matches=%s", target, name, matches) + if not matches: + raise KeyError(name) + # This feels hackish -- it should be the first referent value. + # Find the longest name match.p + path, value = max(matches, key=lambda path_value: len(path_value[0])) + return cast(Referent, value) + + def clone(self) -> 'NameContainer': + new = NameContainer(parent=self.parent) + for k, v in self.items(): + new[k] = v.clone() + return new + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({dict(self)}, parent={self.parent})" + + +class Activation: + """ + Namespace with variable bindings and type name ("annotation") bindings. + + .. rubric:: Life and Content + + An Activation is created by an Environment and contains the annotations + (and a package name) from that Environment. Variables are loaded into the + activation for evaluation. + + A nested Activation is created each time we evaluate a macro. + + An Activation contains a ``NameContainer`` instance to resolve identifers. + (This may be a needless distinction and the two classes could, perhaps, be combined.) + + .. todo:: The environment's annotations are type names used for protobuf. + + .. rubric:: Chaining/Nesting + + Activations can form a chain so locals are checked first. + Activations can nest via macro evaluation, creating transient local variables. + + :: + + ``"[2, 4, 6].map(n, n / 2)"`` + + means nested activations with ``n`` bound to 2, 4, and 6 respectively. + The resulting objects then form a resulting list. + + This is used by an :py:class:`Evaluator` as follows:: + + sub_activation: Activation = self.activation.nested_activation() + sub_eval: Evaluator = self.sub_eval(sub_activation) + sub_eval_partial: Callable[[Value], Value] = sub_eval.partial( + tree_for_variable, tree_for_expression) + push(celtypes.ListType(map(sub_eval_partial, pop())) + + The ``localized_eval()`` creates a new :py:class:`Activation` + and an associated :py:class:`Evaluator` for this nested activation context. + It uses the :py:class:`Evaluator.visit` method to evaluate the given expression for + a new object bound to the given variable. + + .. rubric:: Namespace Creation + + We expand ``{"a.b.c": 42}`` to create nested namespaces: ``{"a": {"b": {"c": 42}}}``. + + This depends on two syntax rules to define the valid names:: + + member : primary + | member "." IDENT ["(" [exprlist] ")"] + + primary : ["."] IDENT ["(" [exprlist] ")"] + + Ignore the ``["(" [exprlist] ")"]`` options used for member functions. + We have members and primaries, both of which depend on the following lexical rule:: + + IDENT : /[_a-zA-Z][_a-zA-Z0-9]*/ + + Name expansion is handled in order of length. Here's why:: + + Scenario: "qualified_identifier_resolution_unchecked" + "namespace resolution should try to find the longest prefix for the evaluator." + + Most names start with ``IDENT``, but a primary can start with ``.``. + """ + + def __init__( + self, + annotations: Optional[Mapping[str, Annotation]] = None, + package: Optional[str] = None, + vars: Optional[Context] = None, + parent: Optional['Activation'] = None, + ) -> None: + """ + Create an Activation. + + The annotations are loaded first. The variables are loaded second, and placed + in front of the annotations in the chain of name resolutions. Values come before + annotations. + + :param annotations: Variables and type annotations. + Annotations are loaded first to serve as defaults to create a parent NameContainer. + :param package: The package name to assume as a prefix for name resolution. + :param vars: Variables and their values, loaded to update the NameContainer. + :param parent: A parent activation in the case of macro evaluations. + """ + logger.info( + "Activation(annotations=%r, package=%r, vars=%r, " + "parent=%s)", annotations, package, vars, parent + ) + # Seed the annotation identifiers for this activation. + self.identifiers: NameContainer = NameContainer( + parent=parent.identifiers if parent else None + ) + if annotations is not None: + self.identifiers.load_annotations(annotations) + + # The name of the run-time package -- an assumed prefix for name resolution + self.package = package + + # Create a child NameContainer with variables (if any.) + if vars is None: + pass + elif isinstance(vars, Activation): # pragma: no cover + # Deprecated legacy feature. + raise NotImplementedError("Use Activation.clone()") + + else: + # Set values from a dictionary of names and values. + self.identifiers.load_values(vars) + + def clone(self) -> "Activation": + """ + Create a clone of this activation with a deep copy of the identifiers. + """ + clone = Activation() + clone.package = self.package + clone.identifiers = self.identifiers.clone() + return clone + + def nested_activation( + self, + annotations: Optional[Mapping[str, Annotation]] = None, + vars: Optional[Context] = None + ) -> 'Activation': + """ + Create a nested sub-Activation that chains to the current activation. + The sub-activations don't have the same implied package context, + + :param annotations: Variable type annotations + :param vars: Variables with literals to be converted to the desired types. + :return: An ``Activation`` that chains to this Activation. + """ + new = Activation( + annotations=annotations, + vars=vars, + parent=self, + package=self.package + ) + return new + + def resolve_variable(self, name: str) -> Union[Result, NameContainer]: + """Find the object referred to by the name. + + An Activation usually has a chain of NameContainers to be searched. + + A variable can refer to an annotation and/or a value and/or a nested + container. Most of the time, we want the `value` attribute of the Referent. + This can be a Result (a Union[Value, CelType]) + """ + container_or_value = self.identifiers.resolve_name(self.package, str(name)) + return cast(Union[Result, NameContainer], container_or_value) + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}" + f"(annotations={self.identifiers.parent!r}, " + f"package={self.package!r}, " + f"vars={self.identifiers!r}, " + f"parent={self.identifiers.parent})" + ) + + +class FindIdent(lark.visitors.Visitor_Recursive): + """Locate the ident token at the bottom of an AST. + + This is needed to find the bind variable for macros. + + It works by doing a "visit" on the entire tree, but saving + the details of the ``ident`` nodes only. + """ + def __init__(self) -> None: + self.ident_token: Optional[str] = None + + def ident(self, tree: lark.Tree) -> None: + ident_token = cast(lark.Token, tree.children[0]) + self.ident_token = ident_token.value + + @classmethod + def in_tree(cls: Type['FindIdent'], tree: lark.Tree) -> Optional[str]: + fi = FindIdent() + fi.visit(tree) + return fi.ident_token + + +def trace( + method: Callable[['Evaluator', lark.Tree], Any]) -> Callable[['Evaluator', lark.Tree], Any]: + """ + Decorator to create consistent evaluation trace logging. + This only works for a class with a ``level`` attribute. + This is generally applied to the methods matching rule names. + """ + @wraps(method) + def concrete_method(self: 'Evaluator', tree: lark.Tree) -> Any: + self.logger.info("%s%r", self.level * '| ', tree) + result = method(self, tree) + self.logger.info("%s%s -> %r", self.level * '| ', tree.data, result) + return result + return concrete_method + + +class Evaluator(lark.visitors.Interpreter[Result]): + """ + Evaluate an AST in the context of a specific Activation. + + See https://github.com/google/cel-go/blob/master/examples/README.md + + General Evaluation. + + An AST node must call ``self.visit_children(tree)`` explicitly + to build the values for all the children of this node. + + Exceptions. + + To handle ``2 / 0 || true``, the ``||``, ``&&``, and ``?:`` operators + do not trivially evaluate and raise exceptions. They bottle up the + exceptions and treat them as a kind of undecided value. + + Identifiers. + + Identifiers have three meanings: + + - An object. This is either a variable provided in the activation or a function provided + when building an execution. Objects also have type annotations. + + - A type annotation without an object, This is used to build protobuf messages. + + - A macro name. The ``member_dot_arg`` construct may have a macro. + Plus the ``ident_arg`` construct may also have a ``dyn()`` or ``has()`` macro. + See below for more. + + Other than macros, a name maps to an ``Referent`` instance. This will have an + annotation and -- perhaps -- an associated object. + + Names have nested paths. ``a.b.c`` is a mapping, ``a``, that contains a mapping, ``b``, + that contains ``c``. + + **MACROS ARE SPECIAL**. + + The macros do not **all** simply visit their children to perform evaluation. + There are three cases: + + - ``dyn()`` does effectively nothing. + It visits it's children, but also provides progressive type resolution + through annotation of the AST. + + - ``has()`` attempts to visit the child and does a boolean transformation + on the result. + This is a macro because it doesn't raise an exception for a missing + member item reference, but instead maps an exception to False. + It doesn't return the value found, for a member item reference; instead, it maps + this to True. + + - The various ``member.macro()`` constructs do **NOT** visit children. + They create a nested evaluation environment for the child variable name and expression. + + The :py:meth:`member` method implements the macro evaluation behavior. + It does not **always** trivially descend into the children. + In the case of macros, the member evaluates one child tree in the presence + of values from another child tree using specific variable binding in a kind + of stack frame. + + """ + logger = logging.getLogger("Evaluator") + + def __init__( + self, + ast: lark.Tree, + activation: Activation, + functions: Union[Sequence[CELFunction], Mapping[str, CELFunction], None] = None + ) -> None: + """ + Create an evaluator for an AST with specific variables and functions. + + :param ast: The AST to evaluate. + :param activation: The variable bindings to use. + :param functions: The functions to use. If nothing is supplied, the default + global `base_functions` are used. Otherwise a ChainMap is created so + these local functions override the base functions. + """ + self.ast = ast + self.base_activation = activation + self.activation = self.base_activation + self.functions: Mapping[str, CELFunction] + if isinstance(functions, Sequence): + local_functions = { + f.__name__: f for f in functions or [] + } + self.functions = collections.ChainMap(local_functions, base_functions) # type: ignore [arg-type] + elif isinstance(functions, Mapping): + self.functions = collections.ChainMap(functions, base_functions) # type: ignore [arg-type] + else: + self.functions = base_functions + + self.level = 0 + self.logger.info("activation: %r", self.activation) + self.logger.info("functions: %r", self.functions) + + def sub_evaluator(self, ast: lark.Tree) -> 'Evaluator': + """ + Build an evaluator for a sub-expression in a macro. + :param ast: The AST for the expression in the macro. + :return: A new `Evaluator` instance. + """ + return Evaluator(ast, activation=self.activation, functions=self.functions) + + def set_activation(self, values: Context) -> 'Evaluator': + """ + Chain a new activation using the given Context. + This is used for two things: + + 1. Bind external variables like command-line arguments or environment variables. + + 2. Build local variable(s) for macro evaluation. + """ + self.activation = self.base_activation.clone() + self.activation.identifiers.load_values(values) + self.logger.info("Activation: %r", self.activation) + return self + + def ident_value(self, name: str, root_scope: bool = False) -> Result_Function: + """Resolve names in the current activation. + This includes variables, functions, the type registry for conversions, + and protobuf packages, as well as protobuf types. + + We may be limited to root scope, which prevents searching through alternative + protobuf package definitions. + """ + try: + return cast(Result, self.activation.resolve_variable(name)) + except KeyError: + return self.functions[name] + + def evaluate(self) -> celpy.celtypes.Value: + """ + Evaluate this AST and return the value or raise an exception. + + There are two variant use cases. + + - External clients want the value or the exception. + + - Internally, we sometimes want to silence CELEvalError exceptions so that + we can apply short-circuit logic and choose a non-exceptional result. + """ + value = self.visit(self.ast) + if isinstance(value, CELEvalError): + raise value + return cast(celpy.celtypes.Value, value) + + def visit_children(self, tree: lark.Tree) -> List[Result]: + """Extend the superclass to track nesting and current evaluation context. + """ + self.level += 1 + result = super().visit_children(tree) + self.level -= 1 + return result + + def function_eval( + self, + name_token: lark.Token, + exprlist: Optional[Iterable[Result]] = None) -> Result: + """ + Function evaluation. + + - Object creation and type conversions. + - Other built-in functions like size() + - Extension functions + """ + function: CELFunction + try: + # TODO: Transitive Lookup of function in all parent activation contexts. + function = self.functions[name_token.value] + except KeyError as ex: + err = ( + f"undeclared reference to '{name_token}' " + f"(in activation '{self.activation}')" + ) + value = CELEvalError(err, ex.__class__, ex.args, token=name_token) + value.__cause__ = ex + return value + + if isinstance(exprlist, CELEvalError): + return exprlist + + try: + list_exprlist = cast(List[Result], exprlist or []) + return function(*list_exprlist) + except ValueError as ex: + value = CELEvalError( + "return error for overflow", ex.__class__, ex.args, token=name_token) + value.__cause__ = ex + return value + except (TypeError, AttributeError) as ex: + self.logger.debug("function_eval(%r, %s) --> %s", name_token, exprlist, ex) + value = CELEvalError( + "no such overload", ex.__class__, ex.args, token=name_token) + value.__cause__ = ex + return value + + def method_eval( + self, + object: Result, + method_ident: lark.Token, + exprlist: Optional[Iterable[Result]] = None) -> Result: + """ + Method evaluation. While are (nominally) attached to an object, the only thing + actually special is that the object is the first parameter to a function. + """ + function: CELFunction + try: + # TODO: Transitive Lookup of function in all parent activation contexts. + function = self.functions[method_ident.value] + except KeyError as ex: + self.logger.debug("method_eval(%r, %r, %s) --> %r", object, method_ident, exprlist, ex) + self.logger.debug("functions: %s", self.functions) + err = ( + f"undeclared reference to {method_ident.value!r} " + f"(in activation '{self.activation}')" + ) + value = CELEvalError(err, ex.__class__, ex.args, token=method_ident) + value.__cause__ = ex + return value + + if isinstance(object, CELEvalError): + return object + elif isinstance(exprlist, CELEvalError): + return exprlist + + try: + list_exprlist = cast(List[Result], exprlist or []) + return function(object, *list_exprlist) + except ValueError as ex: + value = CELEvalError( + "return error for overflow", ex.__class__, ex.args, token=method_ident) + value.__cause__ = ex + return value + except (TypeError, AttributeError) as ex: + self.logger.debug("method_eval(%r, %r, %s) --> %r", object, method_ident, exprlist, ex) + value = CELEvalError("no such overload", ex.__class__, ex.args, token=method_ident) + value.__cause__ = ex + return value + + def macro_has_eval(self, exprlist: lark.Tree) -> celpy.celtypes.BoolType: + """ + The has(e.f) macro. + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + + 1. If e evaluates to a map, then has(e.f) indicates whether the string f is a + key in the map (note that f must syntactically be an identifier). + + 2. If e evaluates to a message and f is not a declared field for the message, + has(e.f) raises a no_such_field error. + + 3. If e evaluates to a protocol buffers version 2 message and f is a defined field: + + - If f is a repeated field or map field, has(e.f) indicates whether the field is + non-empty. + + - If f is a singular or oneof field, has(e.f) indicates whether the field is set. + + 4. If e evaluates to a protocol buffers version 3 message and f is a defined field: + + - If f is a repeated field or map field, has(e.f) indicates whether the field is + non-empty. + + - If f is a oneof or singular message field, has(e.f) indicates whether the field + is set. + + - If f is some other singular field, has(e.f) indicates whether the field's value + is its default value (zero for numeric fields, false for booleans, + empty for strings and bytes). + + 5. In all other cases, has(e.f) evaluates to an error. + + """ + has_values = self.visit_children(exprlist) + return celpy.celtypes.BoolType(not isinstance(has_values[0], CELEvalError)) + + @trace + def expr(self, tree: lark.Tree) -> Result: + """ + expr : conditionalor ["?" conditionalor ":" expr] + + The default implementation short-circuits + and can ignore an CELEvalError in a sub-expression. + + See https://github.com/google/cel-spec/blob/master/doc/langdef.md#logical-operators + + > To get traditional left-to-right short-circuiting evaluation of logical operators, + as in C or other languages (also called "McCarthy Evaluation"), + the expression e1 && e2 can be rewritten `e1 ? e2 : false`. + Similarly, `e1 || e2` can be rewritten `e1 ? true : e2`. + """ + if len(tree.children) == 1: + # expr is a single conditionalor. + values = self.visit_children(tree) + return values[0] + elif len(tree.children) == 3: + # full conditionalor "?" conditionalor ":" expr. + func = self.functions["_?_:_"] + cond_value = self.visit(cast(lark.Tree, tree.children[0])) + left = right = cast(Result, celpy.celtypes.BoolType(False)) + try: + if cond_value: + left = self.visit(cast(lark.Tree, tree.children[1])) + else: + right = self.visit(cast(lark.Tree, tree.children[2])) + return func(cond_value, left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for _?_:_ " + f"applied to '({type(cond_value)}, {type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad expr node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def conditionalor(self, tree: lark.Tree) -> Result: + """ + conditionalor : [conditionalor "||"] conditionaland + + The default implementation short-circuits + and can ignore an CELEvalError in a sub-expression. + """ + if len(tree.children) == 1: + # conditionaland with no preceding conditionalor. + values = self.visit_children(tree) + return values[0] + elif len(tree.children) == 2: + func = self.functions["_||_"] + left, right = cast(Tuple[Result, Result], self.visit_children(tree)) + try: + return func(left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for _||_ " + f"applied to '({type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad conditionalor node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def conditionaland(self, tree: lark.Tree) -> Result: + """ + conditionaland : [conditionaland "&&"] relation + + The default implementation short-circuits + and can ignore an CELEvalError in a sub-expression. + """ + if len(tree.children) == 1: + # relation with no preceding conditionaland. + values = self.visit_children(tree) + return values[0] + elif len(tree.children) == 2: + func = self.functions["_&&_"] + left, right = cast(Tuple[Result, Result], self.visit_children(tree)) + try: + return func(left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for _&&_ " + f"applied to '({type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad conditionalor node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def relation(self, tree: lark.Tree) -> Result: + """ + relation : [relation_lt | relation_le | relation_ge | relation_gt + | relation_eq | relation_ne | relation_in] addition + + relation_lt : relation "<" + relation_le : relation "<=" + relation_gt : relation ">" + relation_ge : relation ">=" + relation_eq : relation "==" + relation_ne : relation "!=" + relation_in : relation "in" + + This could be refactored into separate methods to skip the lookup. + + Ideally:: + + values = self.visit_children(tree) + func = functions[op_name_map[tree.data]] + result = func(*values) + + The AST doesn't provide a flat list of values, however. + """ + if len(tree.children) == 1: + # addition with no preceding relation. + values = self.visit_children(tree) + return values[0] + + elif len(tree.children) == 2: + left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children) + op_name = { + "relation_lt": "_<_", + "relation_le": "_<=_", + "relation_ge": "_>=_", + "relation_gt": "_>_", + "relation_eq": "_==_", + "relation_ne": "_!=_", + "relation_in": "_in_", + }[left_op.data] + func = self.functions[op_name] + # NOTE: values have the structure [[left], right] + (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree)) + self.logger.debug("relation %r %s %r", left, op_name, right) + try: + return func(left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for {left_op.data!r} " + f"applied to '({type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad relation node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def addition(self, tree: lark.Tree) -> Result: + """ + addition : [addition_add | addition_sub] multiplication + + addition_add : addition "+" + addition_sub : addition "-" + + This could be refactored into separate methods to skip the lookup. + + Ideally:: + + values = self.visit_children(tree) + func = functions[op_name_map[tree.data]] + result = func(*values) + + The AST doesn't provide a flat list of values, however. + """ + if len(tree.children) == 1: + # multiplication with no preceding addition. + values = self.visit_children(tree) + return values[0] + + elif len(tree.children) == 2: + left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children) + op_name = { + "addition_add": "_+_", + "addition_sub": "_-_", + }[left_op.data] + func = self.functions[op_name] + # NOTE: values have the structure [[left], right] + (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree)) + self.logger.debug("addition %r %s %r", left, op_name, right) + try: + return func(left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for {left_op.data!r} " + f"applied to '({type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except (ValueError, OverflowError) as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad addition node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def multiplication(self, tree: lark.Tree) -> Result: + """ + multiplication : [multiplication_mul | multiplication_div | multiplication_mod] unary + + multiplication_mul : multiplication "*" + multiplication_div : multiplication "/" + multiplication_mod : multiplication "%" + + This could be refactored into separate methods to skip the lookup. + + Ideally:: + + values = self.visit_children(tree) + func = functions[op_name_map[tree.data]] + result = func(*values) + + The AST doesn't provide a flat list of values, however. + """ + if len(tree.children) == 1: + # unary with no preceding multiplication. + values = self.visit_children(tree) + return values[0] + + elif len(tree.children) == 2: + left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children) + op_name = { + "multiplication_div": "_/_", + "multiplication_mul": "_*_", + "multiplication_mod": "_%_", + }[left_op.data] + func = self.functions[op_name] + # NOTE: values have the structure [[left], right] + (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree)) + self.logger.debug("multiplication %r %s %r", left, op_name, right) + try: + return func(left, right) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + err = ( + f"found no matching overload for {left_op.data!r} " + f"applied to '({type(left)}, {type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except ZeroDivisionError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + value = CELEvalError("modulus or divide by zero", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except (ValueError, OverflowError) as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex) + value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad multiplication node", + line=tree.meta.line, + column=tree.meta.column, + + ) + + @trace + def unary(self, tree: lark.Tree) -> Result: + """ + unary : [unary_not | unary_neg] member + + unary_not : "!" + unary_neg : "-" + + This should be refactored into separate methods to skip the lookup. + + ideally:: + + values = self.visit_children(tree) + func = functions[op_name_map[tree.data]] + result = func(*values) + + But, values has the structure ``[[], right]`` + """ + if len(tree.children) == 1: + # member with no preceeding unary_not or unary_neg + # TODO: If there are two possible values (namespace v. mapping) chose the namespace. + values = self.visit_children(tree) + return values[0] + + elif len(tree.children) == 2: + op_tree, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children) + op_name = { + "unary_not": "!_", + "unary_neg": "-_", + }[op_tree.data] + func = self.functions[op_name] + # NOTE: values has the structure [[], right] + left, right = cast(Tuple[List[Result], Result], self.visit_children(tree)) + self.logger.debug("unary %s %r", op_name, right) + try: + return func(right) + except TypeError as ex: + self.logger.debug("%s(%s) --> %s", func.__name__, right, ex) + err = ( + f"found no matching overload for {op_tree.data!r} " + f"applied to '({type(right)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except ValueError as ex: + self.logger.debug("%s(%s) --> %s", func.__name__, right, ex) + value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad unary node", + line=tree.meta.line, + column=tree.meta.column, + + ) + + def build_macro_eval(self, child: lark.Tree) -> Callable[[celpy.celtypes.Value], Any]: + """ + Builds macro function. + + For example + + ``[1, 2, 3].map(n, n/2)`` + + Builds the function = ``lambda n: n/2``. + + The function will expose exceptions, disabling short-circuit ``||`` and ``&&``. + + The `child` is a `member_dot_arg` construct: + + - [0] is the expression to the left of the '.' + + - [1] is the function, `map`, to the right of the `.` + + - [2] is the arguments in ()'s. + Within this, there are two children: a variable and an expression. + """ + args = cast(lark.Tree, child.children[2]) + var_tree, expr_tree = cast(Tuple[lark.Tree, lark.Tree], args.children) + identifier = FindIdent.in_tree(var_tree) + if identifier is None: # pragma: no cover + # This seems almost impossible. + raise CELSyntaxError( + f"{child.data} {child.children}: bad macro node", + line=child.meta.line, + column=child.meta.column, + ) + # nested_eval = Evaluator(ast=expr_tree, activation=self.activation) + nested_eval = self.sub_evaluator(ast=expr_tree) + + def sub_expr(v: celpy.celtypes.Value) -> Any: + return nested_eval.set_activation({identifier: v}).evaluate() + + return sub_expr + + def build_ss_macro_eval(self, child: lark.Tree) -> Callable[[celpy.celtypes.Value], Any]: + """ + Builds macro function for short-circuit logical evaluation ignoring exception values. + + For example + + ``[1, 2, 'hello'].exists(n, n >= 2)`` + + Builds the function = ``lambda n: n >= 2``. + + The function will swallow exceptions, enabling short-circuit ``||`` and ``&&``. + """ + args = cast(lark.Tree, child.children[2]) + var_tree, expr_tree = cast(Tuple[lark.Tree, lark.Tree], args.children) + identifier = FindIdent.in_tree(var_tree) + if identifier is None: # pragma: no cover + # This seems almost impossible. + raise CELSyntaxError( + f"{child.data} {child.children}: bad macro node", + line=child.meta.line, + column=child.meta.column, + ) + # nested_eval = Evaluator(ast=expr_tree, activation=self.activation) + nested_eval = self.sub_evaluator(ast=expr_tree) + + def sub_expr(v: celpy.celtypes.Value) -> Any: + try: + return nested_eval.set_activation({identifier: v}).evaluate() + except CELEvalError as ex: + return ex + + return sub_expr + + def build_reduce_macro_eval( + self, child: lark.Tree + ) -> Tuple[Callable[[Result, Result], Result], lark.Tree]: + """ + Builds macro function and intiial expression for reduce(). + + For example + + ``[0, 1, 2].reduce(r, i, 0, r + 2*i+1)`` + + Builds the function = ``lambda r, i: r + 2*i+1`` and initial value = 0. + + The `child` is a `member_dot_arg` construct: + + - [0] is the expression to the left of the '.' + + - [1] is the function, `reduce`, to the right of the `.` + + - [2] is the arguments in ()'s. + Within this, there are four children: two variables and two expressions. + """ + args = cast(lark.Tree, child.children[2]) + reduce_var_tree, iter_var_tree, init_expr_tree, expr_tree = ( + cast(Tuple[lark.Tree, lark.Tree, lark.Tree, lark.Tree], args.children) + ) + reduce_ident = FindIdent.in_tree(reduce_var_tree) + iter_ident = FindIdent.in_tree(iter_var_tree) + if reduce_ident is None or iter_ident is None: # pragma: no cover + # This seems almost impossible. + raise CELSyntaxError( + f"{child.data} {child.children}: bad macro node", + line=child.meta.line, + column=child.meta.column, + ) + # nested_eval = Evaluator(ast=expr_tree, activation=self.activation) + nested_eval = self.sub_evaluator(ast=expr_tree) + + def sub_expr(r: Result, i: Result) -> Result: + return nested_eval.set_activation( + {reduce_ident: r, iter_ident: i}).evaluate() + + return sub_expr, init_expr_tree + + @trace + def member(self, tree: lark.Tree) -> Result: + """ + member : member_dot | member_dot_arg | member_item | member_object | primary + + member_dot : member "." IDENT + member_dot_arg : member "." IDENT "(" [exprlist] ")" + member_item : member "[" expr "]" + member_object : member "{" [fieldinits] "}" + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + """ + values = self.visit_children(tree) + return values[0] + + @trace + def member_dot(self, tree: lark.Tree) -> Result: + """ + member : member_dot | member_dot_arg | member_item | member_object | primary + + member_dot : member "." IDENT + member_dot_arg : member "." IDENT "(" [exprlist] ")" + member_item : member "[" expr "]" + member_object : member "{" [fieldinits] "}" + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#name-resolution + + - ``primary``: Variables and Functions: some simple names refer to variables in the + execution context, standard functions, or other name bindings provided by the CEL + application. + + - ``member_dot``: Field selection: appending a period and identifier to an expression + could indicate that we're accessing a field within a protocol buffer or map. + See below for **Field Selection**. + + - ``member_dot``: Protocol buffer package names: a simple or qualified name could + represent an absolute or relative name in the protocol buffer package namespace. + Package names must be followed by a message type, enum type, or enum constant. + + - ``member_dot``: Protocol buffer message types, enum types, and enum constants: + following an optional protocol buffer package name, a simple or qualified name + could refer to a message type, and enum type, or an enum constant in the package's + namespace. + + Field Selection. There are four cases. + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + + - If e evaluates to a message + and f is not declared in this message, the runtime error no_such_field is raised. + + - If e evaluates to a message + and f is declared, but the field is not set, + the default value of the field's type will be produced. + + - If e evaluates to a map, then e.f is equivalent to e['f']. + + - In all other cases, e.f evaluates to an error. + + TODO: implement member "." IDENT for messages. + """ + member_tree, property_name_token = cast(Tuple[lark.Tree, lark.Token], tree.children) + member = self.visit(member_tree) + property_name = property_name_token.value + result: Result + if isinstance(member, CELEvalError): + result = member + elif isinstance(member, NameContainer): + # Navigation through names provided as external run-time bindings. + # The dict is the value of a Referent that was part of a namespace path. + if property_name in member: + result = member[property_name].value + else: + err = f"No {property_name!r} in bindings {sorted(member.keys())}" + result = CELEvalError(err, KeyError, None, tree=tree) + # TODO: Not sure this is needed... + elif isinstance(member, celpy.celtypes.MessageType): + self.logger.info("member_dot(%r, %r)", member, property_name) + result = member.get(property_name) + # TODO: Future Expansion, handle Protobuf message package... + # elif isinstance(member, celpy.celtypes.PackageType): + # if property_name in member: + # result = member[property_name] + # else: + # err = f"no such message {property_name!r} in package {member}" + # result = CELEvalError(err, KeyError, None, tree=tree) + elif isinstance(member, celpy.celtypes.MapType): + # Syntactic sugar: a.b is a["b"] when a is a mapping. + try: + result = member[property_name] + except KeyError: + err = f"no such member in mapping: {property_name!r}" + result = CELEvalError(err, KeyError, None, tree=tree) + else: + err = f"{member!r} with type: '{type(member)}' does not support field selection" + result = CELEvalError(err, TypeError, None, tree=tree) + return result + + @trace + def member_dot_arg(self, tree: lark.Tree) -> Result: + """ + member : member_dot | member_dot_arg | member_item | member_object | primary + + member_dot : member "." IDENT + member_dot_arg : member "." IDENT "(" [exprlist] ")" + member_item : member "[" expr "]" + member_object : member "{" [fieldinits] "}" + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + + Method or macro? We Distinguish between these three similar cases. + + - Macros: https://github.com/google/cel-spec/blob/master/doc/langdef.md#macros + + - member "." IDENT "(" [exprlist] ")" -- used for string operations + + - member "." IDENT "(" ")" -- used for a several timestamp operations. + """ + sub_expr: CELFunction + result: Result + reduction: Result + CELBoolFunction = Callable[[celpy.celtypes.BoolType, Result], celpy.celtypes.BoolType] + + member_tree, method_name_token = cast(Tuple[lark.Tree, lark.Token], tree.children[:2]) + + if method_name_token.value == "map": + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + sub_expr = self.build_macro_eval(tree) + mapping = cast(Iterable[celpy.celtypes.Value], map(sub_expr, member_list)) + result = celpy.celtypes.ListType(mapping) + return result + + elif method_name_token.value == "filter": + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + sub_expr = self.build_macro_eval(tree) + result = celpy.celtypes.ListType(filter(sub_expr, member_list)) + return result + + elif method_name_token.value == "all": + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + and_oper = cast( + CELBoolFunction, + eval_error("no such overload", TypeError)( + celpy.celtypes.logical_and) + ) + sub_expr = self.build_ss_macro_eval(tree) + reduction = reduce(and_oper, map(sub_expr, member_list), celpy.celtypes.BoolType(True)) + return reduction + + elif method_name_token.value == "exists": + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + or_oper = cast( + CELBoolFunction, + eval_error("no such overload", TypeError)( + celpy.celtypes.logical_or) + ) + sub_expr = self.build_ss_macro_eval(tree) + reduction = reduce(or_oper, map(sub_expr, member_list), celpy.celtypes.BoolType(False)) + return reduction + + elif method_name_token.value == "exists_one": + # Is there exactly 1? + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + sub_expr = self.build_macro_eval(tree) + count = sum(1 for value in member_list if bool(sub_expr(value))) + return celpy.celtypes.BoolType(count == 1) + + elif method_name_token.value == "reduce": + # Apply a function to reduce the list to a single value. + # The `tree` is a `member_dot_arg` construct with (member, method_name, args) + # The args have two variables and two expressions. + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + reduce_expr, init_expr_tree = self.build_reduce_macro_eval(tree) + initial_value = self.visit(init_expr_tree) + reduction = reduce(reduce_expr, member_list, initial_value) + return reduction + + elif method_name_token.value == "min": + # Special case of "reduce()" + # with <member>.min() -> <member>.reduce(r, i, int_max, r < i ? r : i) + member_list = cast(celpy.celtypes.ListType, self.visit(member_tree)) + try: + # Note. The Result type includes None, which will raise an exception. + reduction = min(member_list) # type: ignore [type-var] + except ValueError as ex: + err = "Attempt to reduce an empty sequence or a sequence with a None value" + reduction = CELEvalError(err, ex.__class__, ex.args, tree=tree) + return reduction + + else: + # Not a macro: a method evaluation. + # Evaluate member, method IDENT and (if present) exprlist and apply. + if len(tree.children) == 2: + member, ident = cast( + Tuple[Result, lark.Token], + self.visit_children(tree) + ) + result = self.method_eval(member, ident) + else: + # assert len(tree.children) == 3 + member, ident, expr_iter = cast( + Tuple[Result, lark.Token, Iterable[Result]], + self.visit_children(tree) + ) + result = self.method_eval(member, ident, expr_iter) + return result + + @trace + def member_index(self, tree: lark.Tree) -> Result: + """ + member : member_dot | member_dot_arg | member_item | member_object | primary + + member_dot : member "." IDENT + member_dot_arg : member "." IDENT "(" [exprlist] ")" + member_item : member "[" expr "]" + member_object : member "{" [fieldinits] "}" + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + + Locating an item in a Mapping or List + """ + func = self.functions["_[_]"] + values = self.visit_children(tree) + member, index = values + try: + return func(member, index) + except TypeError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex) + err = ( + f"found no matching overload for _[_] " + f"applied to '({type(member)}, {type(index)})'" + ) + value = CELEvalError(err, ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except KeyError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex) + value = CELEvalError("no such key", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + except IndexError as ex: + self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex) + value = CELEvalError("invalid_argument", ex.__class__, ex.args, tree=tree) + value.__cause__ = ex + return value + + @trace + def member_object(self, tree: lark.Tree) -> Result: + """ + member : member_dot | member_dot_arg | member_item | member_object | primary + + member_dot : member "." IDENT + member_dot_arg : member "." IDENT "(" [exprlist] ")" + member_item : member "[" expr "]" + member_object : member "{" [fieldinits] "}" + + https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection + + An object constructor requires a protobyf type, not an object as the "member". + """ + values = self.visit_children(tree) + + if len(values) == 1: + # primary | member "{" "}" + if cast(lark.Tree, tree.children[0]).data == "primary": + value = values[0] + else: + # Build a default protobuf message. + protobuf_class = cast( + celpy.celtypes.FunctionType, + values[0] + ) + self.logger.debug("Creating %s()", protobuf_class) + try: + value = protobuf_class(None) + except (TypeError, ValueError) as ex: # pragma: no cover + value = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + self.logger.debug("Created %s", value) + return value + + elif len(values) == 2: + # protobuf feature: member "{" fieldinits "}" + member, fieldinits = values + if isinstance(member, CELEvalError): + return member + # Apply fieldinits as the constructor for an instance of the referenced type. + protobuf_class = cast( + celpy.celtypes.FunctionType, + member + ) + # NOTE: protobuf MessageType conversions are the responsibility of the target type. + # We can't -- easily -- generalize this. + self.logger.info("Creating %s(%r)", protobuf_class, fieldinits) + try: + value = protobuf_class(cast(celpy.celtypes.Value, fieldinits)) + except (TypeError, ValueError) as ex: # pragma: no cover + value = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + self.logger.info("Created %r", value) + return value + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad member_object node", + line=tree.meta.line, + column=tree.meta.column, + + ) + + @trace + def primary(self, tree: lark.Tree) -> Result: + """ + primary : dot_ident_arg | dot_ident | ident_arg | ident + | paren_expr | list_lit | map_lit | literal + + dot_ident_arg : "." IDENT "(" [exprlist] ")" + dot_ident : "." IDENT + ident_arg : IDENT "(" [exprlist] ")" + ident : IDENT + paren_expr : "(" expr ")" + list_lit : "[" [exprlist] "]" + map_lit : "{" [mapinits] "}" + + TODO: Refactor into separate methods to skip this complex elif chain. + top-level :py:meth:`primary` is similar to :py:meth:`method`. + Each of the individual rules then works with a tree instead of a child of the + primary tree. + + This includes function-like macros: has() and dyn(). + These are special cases and cannot be overridden. + """ + result: Result + name_token: lark.Token + if len(tree.children) != 1: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad primary node", + line=tree.meta.line, + column=tree.meta.column, + ) + + child = cast(lark.Tree, tree.children[0]) + if child.data == "literal": + # A literal value + values = self.visit_children(tree) + return values[0] + + elif child.data == "paren_expr": + # A "(" expr ")" + values = self.visit_children(child) + return values[0] + + elif child.data == "list_lit": + if len(child.children) == 0: + # Empty list + # TODO: Refactor into type_eval() + result = celpy.celtypes.ListType() + else: + # exprlist to be packaged as List. + values = self.visit_children(child) + result = values[0] + return result + + elif child.data == "map_lit": + if len(child.children) == 0: + # Empty mapping + # TODO: Refactor into type_eval() + result = celpy.celtypes.MapType() + else: + # mapinits (a sequence of key-value tuples) to be packaged as a dict. + # OR. An CELEvalError in case of ValueError caused by duplicate keys. + # OR. An CELEvalError in case of TypeError cause by invalid key types. + # TODO: Refactor into type_eval() + try: + values = self.visit_children(child) + result = values[0] + except ValueError as ex: + result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + except TypeError as ex: + result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + return result + + elif child.data in ("dot_ident", "dot_ident_arg"): + # "." IDENT ["(" [exprlist] ")"] + # Leading "." means the name is resolved in the root scope **only**. + # No searching through alterantive packages. + # len(child) == 1 -- "." IDENT + # len(child) == 2 -- "." IDENT "(" exprlist ")" -- TODO: Implement dot_ident_arg. + values = self.visit_children(child) + name_token = cast(lark.Token, values[0]) + # Should not be a Function, should only be a Result + # TODO: implement dot_ident_arg uses function_eval(). + try: + result = cast(Result, self.ident_value(name_token.value, root_scope=True)) + except KeyError as ex: + result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + return result + + elif child.data == "ident_arg": + # IDENT ["(" [exprlist] ")"] + # Can be a proper function or one of the function-like macros: "has()", "dyn()". + exprlist: lark.Tree + if len(child.children) == 1: + name_token = cast(lark.Token, child.children[0]) + exprlist = lark.Tree(data="exprlist", children=[]) + elif len(child.children) == 2: + name_token, exprlist = cast(Tuple[lark.Token, lark.Tree], child.children) + else: + raise CELSyntaxError( # pragma: no cover + f"{tree.data} {tree.children}: bad primary node", + line=tree.meta.line, + column=tree.meta.column, + + ) + + if name_token.value == "has": + # has() macro. True if the child expression is a member expression that evaluates. + # False if the child expression is a member expression that cannot be evaluated. + return self.macro_has_eval(exprlist) + elif name_token.value == "dyn": + # dyn() macro does nothing; it's for run-time type-checking. + dyn_values = self.visit_children(exprlist) + return dyn_values[0] + else: + # Ordinary function() evaluation. + values = self.visit_children(exprlist) + return self.function_eval(name_token, cast(Iterable[celpy.celtypes.Value], values)) + + elif child.data == "ident": + # IDENT -- simple identifier from the current activation. + name_token = cast(lark.Token, child.children[0]) + try: + # Should not be a Function. + # Generally Result object (i.e., a variable) + # Could be an Annotation object (i.e., a type) for protobuf messages + result = cast(Result, self.ident_value(name_token.value)) + except KeyError as ex: + err = ( + f"undeclared reference to '{name_token}' " + f"(in activation '{self.activation}')" + ) + result = CELEvalError(err, ex.__class__, ex.args, tree=tree) + return result + + else: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad primary node", + line=tree.meta.line, + column=tree.meta.column, + ) + + @trace + def literal(self, tree: lark.Tree) -> Result: + """ + Create a literal from the token at the top of the parse tree. + + .. todo:: Use type provider conversions from string to CEL type objects. + """ + if len(tree.children) != 1: + raise CELSyntaxError( + f"{tree.data} {tree.children}: bad literal node", + line=tree.meta.line, + column=tree.meta.column, + + ) + value_token = cast(lark.Token, tree.children[0]) + try: + result: Result + if value_token.type == "FLOAT_LIT": + result = celpy.celtypes.DoubleType(value_token.value) + elif value_token.type == "INT_LIT": + result = celpy.celtypes.IntType(value_token.value) + elif value_token.type == "UINT_LIT": + if not value_token.value[-1].lower() == 'u': + raise CELSyntaxError( + f"invalid unsigned int literal {value_token!r}", + line=tree.meta.line, + column=tree.meta.column, + ) + result = celpy.celtypes.UintType(value_token.value[:-1]) + elif value_token.type in ("MLSTRING_LIT", "STRING_LIT"): + result = celstr(value_token) + elif value_token.type == "BYTES_LIT": + result = celbytes(value_token) + elif value_token.type == "BOOL_LIT": + result = ( + celpy.celtypes.BoolType(value_token.value.lower() == "true") + ) + elif value_token.type == "NULL_LIT": + result = None + else: + raise CELUnsupportedError( + f"{tree.data} {tree.children}: type not implemented", + line=value_token.line, + column=value_token.column, + ) + except ValueError as ex: + result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree) + + return result + + @trace + def exprlist(self, tree: lark.Tree) -> Result: + """ + exprlist : expr ("," expr)* + """ + values = self.visit_children(tree) + errors = (v for v in values if isinstance(v, CELEvalError)) + try: + return next(errors) + except StopIteration: + pass + # There are no CELEvalError values in the result, so we can narrow the domain. + result = celpy.celtypes.ListType(cast(List[celpy.celtypes.Value], values)) + return result + + @trace + def fieldinits(self, tree: lark.Tree) -> Result: + """ + fieldinits : IDENT ":" expr ("," IDENT ":" expr)* + + The even items, children[0::2] are identifiers, nothing to evaluate. + The odd items, childnre[1::2] are expressions. + + This creates a mapping, used by the :meth:`member_object` method to create + and populate a protobuf object. Duplicate names are an error. + """ + fields: Dict[str, Any] = {} + pairs = cast( + Iterable[Tuple[lark.Token, lark.Tree]], + zip(tree.children[0::2], tree.children[1::2]) + ) + for ident_node, expr_node in pairs: + ident = ident_node.value + expr = cast(celpy.celtypes.Value, self.visit_children(expr_node)[0]) + if ident in fields: + raise ValueError(f"Duplicate field label {ident!r}") + fields[ident] = expr + return celpy.celtypes.MessageType(**fields) + + @trace + def mapinits(self, tree: lark.Tree) -> Result: + """ + mapinits : expr ":" expr ("," expr ":" expr)* + + Extract the key expr's and value expr's to a list of pairs. + This raises an exception on a duplicate key. + + TODO: Is ``{'a': 1, 'b': 2/0}['a']`` a meaningful result in CEL? + Or is this an error because the entire member is erroneous? + + """ + result = celpy.celtypes.MapType() + + # Not sure if this cast is sensible. Should a CELEvalError propagate up from the + # sub-expressions? See the error check in :py:func:`exprlist`. + keys_values = cast(List[celpy.celtypes.Value], self.visit_children(tree)) + pairs = zip(keys_values[0::2], keys_values[1::2]) + for key, value in pairs: + if key in result: + raise ValueError(f"Duplicate key {key!r}") + result[key] = value + + return result + + +CEL_ESCAPES_PAT = re.compile( + "\\\\[abfnrtv\"'\\\\]|\\\\\\d{3}|\\\\x[0-9a-fA-F]{2}|\\\\u[0-9a-fA-F]{4}|\\\\U[0-9a-fA-F]{8}|." +) + + +CEL_ESCAPES = { + '\\a': '\a', '\\b': '\b', '\\f': '\f', '\\n': '\n', + '\\r': '\r', '\\t': '\t', '\\v': '\v', + '\\"': '"', "\\'": "'", '\\\\': '\\' +} + + +def celstr(token: lark.Token) -> celpy.celtypes.StringType: + """ + Evaluate a CEL string literal, expanding escapes to create a Python string. + + It may be that built-in ``eval()`` might work for some of this, but + the octal escapes aren't really viable. + + :param token: CEL token value + :return: str + + .. todo:: This can be refactored into celpy.celtypes.StringType. + """ + def expand(match_iter: Iterable[Match[str]]) -> Iterator[str]: + for match in (m.group() for m in match_iter): + if len(match) == 1: + expanded = match + elif match[:2] == r'\x': + expanded = chr(int(match[2:], 16)) + elif match[:2] in {r'\u', r'\U'}: + expanded = chr(int(match[2:], 16)) + elif match[:1] == '\\' and len(match) == 4: + expanded = chr(int(match[1:], 8)) + else: + expanded = CEL_ESCAPES.get(match, match) + yield expanded + + text = token.value + if text[:1] in ("R", "r"): + # Raw; ignore ``\`` escapes + if text[1:4] == '"""' or text[1:4] == "'''": + # Long + expanded = text[4:-3] + else: + # Short + expanded = text[2:-1] + else: + # Cooked; expand ``\`` escapes + if text[0:3] == '"""' or text[0:3] == "'''": + # Long + match_iter = CEL_ESCAPES_PAT.finditer(text[3:-3]) + else: + # Short + match_iter = CEL_ESCAPES_PAT.finditer(text[1:-1]) + expanded = ''.join(expand(match_iter)) + return celpy.celtypes.StringType(expanded) + + +def celbytes(token: lark.Token) -> celpy.celtypes.BytesType: + """ + Evaluate a CEL bytes literal, expanding escapes to create a Python bytes object. + + :param token: CEL token value + :return: bytes + + .. todo:: This can be refactored into celpy.celtypes.BytesType. + """ + def expand(match_iter: Iterable[Match[str]]) -> Iterator[int]: + for match in (m.group() for m in match_iter): + if len(match) == 1: + yield from match.encode('utf-8') + elif match[:2] == r'\x': + yield int(match[2:], 16) + elif match[:2] == r'\u': + yield int(match[2:], 16) + elif match[:1] == '\\' and len(match) == 4: + yield int(match[1:], 8) + else: + yield ord(CEL_ESCAPES.get(match, match)) + + text = token.value + if text[:2].lower() == "br": + # Raw; ignore ``\`` escapes + if text[2:5] == '"""' or text[2:5] == "'''": + # Long + expanded = celpy.celtypes.BytesType(ord(c) for c in text[5:-3]) + else: + # Short + expanded = celpy.celtypes.BytesType(ord(c) for c in text[3:-1]) + elif text[:1].lower() == "b": + # Cooked; expand ``\`` escapes + if text[1:4] == '"""' or text[1:4] == "'''": + # Long + match_iter = CEL_ESCAPES_PAT.finditer(text[4:-3]) + else: + # Short + match_iter = CEL_ESCAPES_PAT.finditer(text[2:-1]) + expanded = celpy.celtypes.BytesType(expand(match_iter)) + else: + raise ValueError(f"Invalid bytes literal {token.value!r}") + return expanded diff --git a/.venv/lib/python3.12/site-packages/celpy/py.typed b/.venv/lib/python3.12/site-packages/celpy/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/py.typed |