aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/celpy
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/celpy
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy')
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/__init__.py293
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/__main__.py465
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/adapter.py137
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/c7nlib.py1582
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/cel.lark179
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/celparser.py402
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/celtypes.py1495
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/evaluation.py2446
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/py.typed0
9 files changed, 6999 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/__init__.py b/.venv/lib/python3.12/site-packages/celpy/__init__.py
new file mode 100644
index 00000000..0306530f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/__init__.py
@@ -0,0 +1,293 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+Pure Python implementation of CEL.
+
+.. todo:: Consolidate __init__ and parser into one module?
+
+Visible interface to CEL. This exposes the :py:class:`Environment`,
+the :py:class:`Evaluator` run-time, and the :py:mod:`celtypes` module
+with Python types wrapped to be CEL compatible.
+
+Example
+=======
+
+Here's an example with some details::
+
+ >>> import celpy
+
+ # A list of type names and class bindings used to create an environment.
+ >>> types = []
+ >>> env = celpy.Environment(types)
+
+ # Parse the code to create the CEL AST.
+ >>> ast = env.compile("355. / 113.")
+
+ # Use the AST and any overriding functions to create an executable program.
+ >>> functions = {}
+ >>> prgm = env.program(ast, functions)
+
+ # Variable bindings.
+ >>> activation = {}
+
+ # Final evaluation.
+ >>> try:
+ ... result = prgm.evaluate(activation)
+ ... error = None
+ ... except CELEvalError as ex:
+ ... result = None
+ ... error = ex.args[0]
+
+ >>> result # doctest: +ELLIPSIS
+ DoubleType(3.14159...)
+
+Another Example
+===============
+
+See https://github.com/google/cel-go/blob/master/examples/simple_test.go
+
+The model Go we're sticking close to::
+
+ d := cel.Declarations(decls.NewVar("name", decls.String))
+ env, err := cel.NewEnv(d)
+ if err != nil {
+ log.Fatalf("environment creation error: %v\\n", err)
+ }
+ ast, iss := env.Compile(`"Hello world! I'm " + name + "."`)
+ // Check iss for compilation errors.
+ if iss.Err() != nil {
+ log.Fatalln(iss.Err())
+ }
+ prg, err := env.Program(ast)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ out, _, err := prg.Eval(map[string]interface{}{
+ "name": "CEL",
+ })
+ if err != nil {
+ log.Fatalln(err)
+ }
+ fmt.Println(out)
+ // Output:Hello world! I'm CEL.
+
+Here's the Pythonic approach, using concept patterned after the Go implementation::
+
+ >>> from celpy import *
+ >>> decls = {"name": celtypes.StringType}
+ >>> env = Environment(annotations=decls)
+ >>> ast = env.compile('"Hello world! I\\'m " + name + "."')
+ >>> out = env.program(ast).evaluate({"name": "CEL"})
+ >>> print(out)
+ Hello world! I'm CEL.
+
+"""
+import json # noqa: F401
+import logging
+import sys
+from typing import Any, Dict, Optional, Type, cast
+
+import lark
+
+import celpy.celtypes
+from celpy.adapter import (CELJSONDecoder, CELJSONEncoder, # noqa: F401
+ json_to_cel)
+from celpy.celparser import CELParseError, CELParser # noqa: F401
+from celpy.evaluation import (Activation, Annotation, # noqa: F401
+ CELEvalError, CELFunction, Context, Evaluator,
+ Result, base_functions)
+
+# A parsed AST.
+Expression = lark.Tree
+
+
+class Runner:
+ """Abstract runner.
+
+ Given an AST, this can evaluate the AST in the context of a specific activation
+ with any override function definitions.
+
+ .. todo:: add type adapter and type provider registries.
+ """
+ def __init__(
+ self,
+ environment: 'Environment',
+ ast: lark.Tree,
+ functions: Optional[Dict[str, CELFunction]] = None
+ ) -> None:
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.environment = environment
+ self.ast = ast
+ self.functions = functions
+
+ def new_activation(self, context: Context) -> Activation:
+ """
+ Builds the working activation from the environmental defaults.
+ """
+ return self.environment.activation().nested_activation(vars=context)
+
+ def evaluate(self, activation: Context) -> celpy.celtypes.Value: # pragma: no cover
+ raise NotImplementedError
+
+
+class InterpretedRunner(Runner):
+ """
+ Pure AST expression evaluator. Uses :py:class:`evaluation.Evaluator` class.
+
+ Given an AST, this evauates the AST in the context of a specific activation.
+
+ The returned value will be a celtypes type.
+
+ Generally, this should raise an :exc:`CELEvalError` for most kinds of ordinary problems.
+ It may raise an :exc:`CELUnsupportedError` for future features.
+
+ .. todo:: Refractor the Evaluator constructor from evaluation.
+ """
+ def evaluate(self, context: Context) -> celpy.celtypes.Value:
+ e = Evaluator(
+ ast=self.ast,
+ activation=self.new_activation(context),
+ functions=self.functions
+ )
+ value = e.evaluate()
+ return value
+
+
+class CompiledRunner(Runner):
+ """
+ Python compiled expression evaluator. Uses Python byte code and :py:func:`eval`.
+
+ Given an AST, this evauates the AST in the context of a specific activation.
+
+ Transform the AST into Python, uses :py:func:`compile` to create a code object.
+ Uses :py:func:`eval` to evaluate.
+ """
+ def __init__(
+ self,
+ environment: 'Environment',
+ ast: lark.Tree,
+ functions: Optional[Dict[str, CELFunction]] = None
+ ) -> None:
+ super().__init__(environment, ast, functions)
+ # Transform AST to Python.
+ # compile()
+ # cache executable code object.
+
+ def evaluate(self, activation: Context) -> celpy.celtypes.Value:
+ # eval() code object with activation as locals, and built-ins as gobals.
+ return super().evaluate(activation)
+
+
+# TODO: Refactor classes into a separate "cel_protobuf" module.
+# TODO: Becomes cel_protobuf.Int32Value
+class Int32Value(celpy.celtypes.IntType):
+ def __new__(
+ cls: Type['Int32Value'],
+ value: Any = 0,
+ ) -> 'Int32Value':
+ """TODO: Check range. This seems to matter for protobuf."""
+ if isinstance(value, celpy.celtypes.IntType):
+ return cast(Int32Value, super().__new__(cls, value))
+ # TODO: elif other type conversions...
+ else:
+ convert = celpy.celtypes.int64(int)
+ return cast(Int32Value, super().__new__(cls, convert(value)))
+
+
+# The "well-known" types in a google.protobuf package.
+# We map these to CEl types instead of defining additional Protobuf Types.
+# This approach bypasses some of the range constraints that are part of these types.
+# It may also cause values to compare as equal when they were originally distinct types.
+googleapis = {
+ 'google.protobuf.Int32Value': celpy.celtypes.IntType,
+ 'google.protobuf.UInt32Value': celpy.celtypes.UintType,
+ 'google.protobuf.Int64Value': celpy.celtypes.IntType,
+ 'google.protobuf.UInt64Value': celpy.celtypes.UintType,
+ 'google.protobuf.FloatValue': celpy.celtypes.DoubleType,
+ 'google.protobuf.DoubleValue': celpy.celtypes.DoubleType,
+ 'google.protobuf.BoolValue': celpy.celtypes.BoolType,
+ 'google.protobuf.BytesValue': celpy.celtypes.BytesType,
+ 'google.protobuf.StringValue': celpy.celtypes.StringType,
+ 'google.protobuf.ListValue': celpy.celtypes.ListType,
+ 'google.protobuf.Struct': celpy.celtypes.MessageType,
+}
+
+
+class Environment:
+ """Compiles CEL text to create an Expression object.
+
+ From the Go implementation, there are things to work with the type annotations:
+
+ - type adapters registry make other native types available for CEL.
+
+ - type providers registry make ProtoBuf types available for CEL.
+
+ .. todo:: Add adapter and provider registries to the Environment.
+ """
+ def __init__(
+ self,
+ package: Optional[str] = None,
+ annotations: Optional[Dict[str, Annotation]] = None,
+ runner_class: Optional[Type[Runner]] = None
+ ) -> None:
+ """
+ Create a new environment.
+
+ This also increases the default recursion limit to handle the defined minimums for CEL.
+
+ :param package: An optional package name used to resolve names in an Activation
+ :param annotations: Names with type annotations.
+ There are two flavors of names provided here.
+
+ - Variable names based on :py:mod:``celtypes``
+
+ - Function names, using ``typing.Callable``.
+ :param runner_class: the class of Runner to use, either InterpretedRunner or CompiledRunner
+ """
+ sys.setrecursionlimit(2500)
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.package: Optional[str] = package
+ self.annotations: Dict[str, Annotation] = annotations or {}
+ self.logger.info("Type Annotations %r", self.annotations)
+ self.runner_class: Type[Runner] = runner_class or InterpretedRunner
+ self.cel_parser = CELParser()
+ self.runnable: Runner
+
+ # Fold in standard annotations. These (generally) define well-known protobuf types.
+ self.annotations.update(googleapis)
+ # We'd like to add 'type.googleapis.com/google' directly, but it seems to be an alias
+ # for 'google', the path after the '/' in the uri.
+
+ def compile(self, text: str) -> Expression:
+ """Compile the CEL source. This can raise syntax error exceptions."""
+ ast = self.cel_parser.parse(text)
+ return ast
+
+ def program(
+ self,
+ expr: lark.Tree,
+ functions: Optional[Dict[str, CELFunction]] = None
+ ) -> Runner:
+ """Transforms the AST into an executable runner."""
+ self.logger.info("Package %r", self.package)
+ runner_class = self.runner_class
+ self.runnable = runner_class(self, expr, functions)
+ return self.runnable
+
+ def activation(self) -> Activation:
+ """Returns a base activation"""
+ activation = Activation(package=self.package, annotations=self.annotations)
+ return activation
diff --git a/.venv/lib/python3.12/site-packages/celpy/__main__.py b/.venv/lib/python3.12/site-packages/celpy/__main__.py
new file mode 100644
index 00000000..c9483173
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/__main__.py
@@ -0,0 +1,465 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+Pure Python implementation of CEL.
+
+This provides a few jq-like, bc-like, and shell expr-like features.
+
+- ``jq`` uses ``.`` to refer the current document. By setting a package
+ name of ``"jq"`` and placing the JSON object in the package, we achieve
+ similar syntax.
+
+- ``bc`` offers complex function definitions and other programming support.
+ CEL can only evaluate a few bc-like expressions.
+
+- This does everything ``expr`` does, but the syntax is slightly different.
+ The output of comparisons -- by default -- is boolean, where ``expr`` is an integer 1 or 0.
+ Use ``-f 'd'`` to see decimal output instead of Boolean text values.
+
+- This does some of what ``test`` does, without a lot of the sophisticated
+ file system data gathering.
+ Use ``-b`` to set the exit status code from a Boolean result.
+
+TODO: This can also have a REPL, as well as process CSV files.
+
+SYNOPSIS
+========
+
+::
+
+ python -m celpy [--arg name:type=value ...] [--null-input] expr
+
+Options:
+
+:--arg:
+ Provides argument names, types and optional values.
+ If the value is not provided, the name is expected to be an environment
+ variable, and the value of the environment variable is converted and used.
+
+:--null-input:
+ Normally, JSON documents are read from stdin in ndjson format. If no JSON documents are
+ provided, the ``--null-input`` option skips trying to read from stdin.
+
+:expr:
+ A CEL expression to evaluate.
+
+JSON documents are read from stdin in NDJSON format (http://jsonlines.org/, http://ndjson.org/).
+For each JSON document, the expression is evaluated with the document in a default
+package. This allows `.name` to pick items from the document.
+
+By default, the output is JSON serialized. This means strings will be JSON-ified and have quotes.
+
+If a ``--format`` option is provided, this is applied to the resulting object; this can be
+used to strip quotes, or limit precision on double objects, or convert numbers to hexadecimal.
+
+Arguments, Types, and Namespaces
+================================
+
+CEL objects rely on the celtypes definitions.
+
+Because of the close association between CEL and protobuf, some well-known protobuf types
+are also supported.
+
+.. todo:: CLI type environment
+
+ Permit name.name:type=value to create namespace bindings.
+
+Further, type providers can be bound to CEL. This means an extended CEL
+may have additional types beyond those defined by the :py:class:`Activation` class.
+
+"""
+
+import argparse
+import ast
+import cmd
+import json
+import logging
+import os
+import re
+import sys
+from typing import Any, Callable, Dict, List, Optional, Tuple, cast
+
+from celpy import Environment, Runner, celtypes
+from celpy.adapter import CELJSONDecoder, CELJSONEncoder
+from celpy.celparser import CELParseError
+from celpy.evaluation import Annotation, CELEvalError, Result
+
+logger = logging.getLogger("celpy")
+
+
+# For argument parsing purposes.
+# Note the reliance on `ast.literal_eval` for ListType and MapType conversions.
+# Other types convert strings directly. These types need some help.
+CLI_ARG_TYPES: Dict[str, Annotation] = {
+ "int":
+ celtypes.IntType,
+ "uint":
+ celtypes.UintType,
+ "double":
+ celtypes.DoubleType,
+ "bool":
+ celtypes.BoolType,
+ "string":
+ celtypes.StringType,
+ "bytes":
+ celtypes.BytesType,
+ "list":
+ cast(Callable[..., celtypes.Value], lambda arg: celtypes.ListType(ast.literal_eval(arg))),
+ "map":
+ cast(Callable[..., celtypes.Value], lambda arg: celtypes.MapType(ast.literal_eval(arg))),
+ "null_type":
+ cast(Callable[..., celtypes.Value], lambda arg: None),
+ "single_duration":
+ celtypes.DurationType,
+ "single_timestamp":
+ celtypes.TimestampType,
+
+ "int64_value": celtypes.IntType,
+ "uint64_value": celtypes.UintType,
+ "double_value": celtypes.DoubleType,
+ "bool_value": celtypes.BoolType,
+ "string_value": celtypes.StringType,
+ "bytes_value": celtypes.BytesType,
+ "number_value": celtypes.DoubleType, # Ambiguous; can somtimes be integer.
+ "null_value": cast(Callable[..., celtypes.Value], lambda arg: None),
+}
+
+
+def arg_type_value(text: str) -> Tuple[str, Annotation, celtypes.Value]:
+ """
+ Decompose ``-a name:type=value`` argument into a useful triple.
+
+ Also accept ``-a name:type``. This will find ``name`` in the environment and convert to the
+ requested type.
+
+ Also accepts ``-a name``. This will find ``name`` in the environment and treat it as a string.
+
+ Currently, names do not reflect package naming. An environment can be a package,
+ and the activation can include variables that are also part of the package.
+ This is not supported via the CLI.
+
+ Types can be celtypes class names or TYPE_NAME or PROTOBUF_TYPE
+
+ ::
+
+ TYPE_NAME : "int64_value" | "null_value" | "uint64_value" | "double_value"
+ | "bool_value" | "string_value" | "bytes_value" | "number_value"
+
+ PROTOBUF_TYPE : "single_int64" | "single_int32" | "single_uint64" | "single_uint32"
+ | "single_sint64" | "single_sint32" | "single_fixed64" | "single_fixed32"
+ | "single_sfixed32" | "single_sfixed64" | "single_float" | "single_double"
+ | "single_bool" | "single_string" | "single_bytes"
+ | "single_duration" | "single_timestamp"
+
+ .. todo:: type names can include `.` to support namespacing for protobuf support.
+
+ :param text: Argument value
+ :return: Tuple with name, annotation, and resulting object.
+ """
+ arg_pattern = re.compile(
+ r"^([_a-zA-Z][_a-zA-Z0-9]*)(?::([_a-zA-Z][_a-zA-Z0-9]*))?(?:=(.*))?$"
+ )
+ match = arg_pattern.match(text)
+ if match is None:
+ raise argparse.ArgumentTypeError(
+ f"arg {text} not 'var=string', 'var:type=value', or `var:type")
+ name, type_name, value_text = match.groups()
+ if value_text is None:
+ value_text = os.environ.get(name)
+ type_definition: Annotation # CELType or a conversion function
+ value: celtypes.Value # Specific value.
+ if type_name:
+ try:
+ type_definition = CLI_ARG_TYPES[type_name]
+ value = cast(
+ celtypes.Value,
+ type_definition(value_text) # type: ignore[arg-type, call-arg]
+ )
+ except KeyError:
+ raise argparse.ArgumentTypeError(f"arg {text} type name not in {list(CLI_ARG_TYPES)}")
+ except ValueError:
+ raise argparse.ArgumentTypeError(f"arg {text} value invalid for the supplied type")
+ else:
+ value = celtypes.StringType(value_text or "")
+ type_definition = celtypes.StringType
+ return name, type_definition, value
+
+
+def get_options(argv: Optional[List[str]] = None) -> argparse.Namespace:
+ """Parses command-line arguments.
+ """
+ parser = argparse.ArgumentParser(prog="celpy", description="Pure Python CEL")
+ parser.add_argument(
+ "-v", "--verbose", default=0, action='count')
+
+ # Inputs
+ parser.add_argument(
+ "-a", "--arg", action='append', type=arg_type_value,
+ help="Variables to set; -a name:type=value, or -a name=value for strings, "
+ "or -a name to read an environment variable"
+ )
+ parser.add_argument(
+ "-n", "--null-input", dest='null_input', default=False, action='store_true',
+ help="Avoid reading Newline-Delimited JSON documents from stdin"
+ )
+ parser.add_argument(
+ "-s", "--slurp", default=False, action="store_true",
+ help="Slurp a single, multiple JSON document from stdin"
+ )
+ parser.add_argument(
+ "-i", "--interactive", default=False, action="store_true",
+ help="Interactive REPL"
+ )
+
+ # JSON handling
+ parser.add_argument(
+ "--json-package", "-p", metavar="NAME", dest="package", default=None, action="store",
+ help="Each JSON input is a CEL package, allowing .name to work"
+ )
+ parser.add_argument(
+ "--json-document", "-d", metavar="NAME", dest="document", default=None, action="store",
+ help="Each JSON input is a variable, allowing name.map(x, x*2) to work"
+ )
+
+ # Outputs and Status
+ parser.add_argument(
+ "-b", "--boolean", default=False, action='store_true',
+ help="If the result is True, the exit status is 0, for False, it's 1, otherwise 2"
+ )
+ parser.add_argument(
+ "-f", "--format", default=None, action='store',
+ help=(
+ "Use Python formating instead of JSON conversion of results; "
+ "Example '.6f' to format a DoubleType result"
+ )
+ )
+
+ # The expression
+ parser.add_argument("expr", nargs='?')
+
+ options = parser.parse_args(argv)
+ if options.package and options.document:
+ parser.error("Either use --json-package or --json-document, not both")
+ if not options.package and not options.document:
+ options.package = "jq"
+ if options.interactive and options.expr:
+ parser.error("Interactive mode and an expression provided")
+ if not options.interactive and not options.expr:
+ parser.error("No expression provided")
+ return options
+
+
+class CEL_REPL(cmd.Cmd):
+ prompt = "CEL> "
+ intro = "Enter an expression to have it evaluated."
+ logger = logging.getLogger("REPL")
+
+ def cel_eval(self, text: str) -> celtypes.Value:
+ try:
+ expr = self.env.compile(text)
+ prgm = self.env.program(expr)
+ return prgm.evaluate(self.state)
+ except CELParseError as ex:
+ print(self.env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ raise
+
+ def preloop(self) -> None:
+ self.env = Environment()
+ self.state: Dict[str, celtypes.Value] = {}
+
+ def do_set(self, args: str) -> bool:
+ """Set variable expression
+
+ Evaluates the expression, saves the result as the given variable in the current activation.
+ """
+ name, space, args = args.partition(' ')
+ try:
+ value: celtypes.Value = self.cel_eval(args)
+ print(value)
+ self.state[name] = value
+ except Exception as ex:
+ self.logger.error(ex)
+ return False
+
+ def do_show(self, args: str) -> bool:
+ """Shows all variables in the current activation."""
+ print(self.state)
+ return False
+
+ def do_quit(self, args: str) -> bool:
+ """Quits from the REPL."""
+ return True
+
+ do_exit = do_quit
+ do_bye = do_quit
+
+ def default(self, args: str) -> None:
+ """Evaluate an expression."""
+ try:
+ value = self.cel_eval(args)
+ print(value)
+ except Exception as ex:
+ self.logger.error(ex)
+
+
+def process_json_doc(
+ display: Callable[[Result], None],
+ prgm: Runner,
+ activation: Dict[str, Any],
+ variable: str,
+ document: str,
+ boolean_to_status: bool = False) -> int:
+ """
+ Process a single JSON document. Either one line of an NDJSON stream
+ or the only document in slurp mode. We assign it to the variable "jq".
+ This variable can be the package name, allowing ``.name``) to work.
+ Or. It can be left as a variable, allowing ``jq`` and ``jq.map(x, x*2)`` to work.
+
+ Returns status code 0 for success, 3 for failure.
+ """
+ try:
+ activation[variable] = json.loads(document, cls=CELJSONDecoder)
+ result = prgm.evaluate(activation)
+ display(result)
+ if (boolean_to_status and isinstance(result, (celtypes.BoolType, bool))):
+ return 0 if result else 1
+ return 0
+ except CELEvalError as ex:
+ # ``jq`` KeyError problems result in ``None``.
+ # Other errors should, perhaps, be more visible.
+ logger.debug("Encountered %s on document %r", ex, document)
+ display(None)
+ return 0
+ except json.decoder.JSONDecodeError as ex:
+ logger.error("%s on document %r", ex.args[0], document)
+ # print(f"{ex.args[0]} in {document!r}", file=sys.stderr)
+ return 3
+
+
+def main(argv: Optional[List[str]] = None) -> int:
+ """
+ Given options from the command-line, execute the CEL expression.
+
+ With --null-input option, only --arg and expr matter.
+
+ Without --null-input, JSON documents are read from STDIN, following ndjson format.
+
+ With the --slurp option, it reads one JSON from stdin, spread over multiple lines.
+
+ If "--json-package" is used, each JSON document becomes a package, and
+ top-level dictionary keys become valid ``.name`` expressions.
+ Otherwise, "--json-object" is the default, and each JSON document
+ is assigned to a variable. The default name is "jq" to allow expressions
+ that are similar to ``jq`` but with a "jq" prefix.
+ """
+ options = get_options(argv)
+ if options.verbose == 1:
+ logging.getLogger().setLevel(logging.INFO)
+ elif options.verbose > 1:
+ logging.getLogger().setLevel(logging.DEBUG)
+ logger.debug(options)
+
+ if options.interactive:
+ repl = CEL_REPL()
+ repl.cmdloop()
+ return 0
+
+ if options.format:
+ def output_display(result: Result) -> None:
+ print('{0:{format}}'.format(result, format=options.format))
+ else:
+ def output_display(result: Result) -> None:
+ print(json.dumps(result, cls=CELJSONEncoder))
+
+ logger.info("Expr: %r", options.expr)
+
+ if options.arg:
+ logger.info("Args: %r", options.arg)
+
+ annotations: Optional[Dict[str, Annotation]]
+ if options.arg:
+ annotations = {
+ name: type for name, type, value in options.arg
+ }
+ else:
+ annotations = None
+
+ # If we're creating a named JSON document, we don't provide a default package.
+ # If we're usinga JSON document to populate a package, we provide the given name.
+ env = Environment(
+ package=None if options.null_input else options.package,
+ annotations=annotations,
+ )
+ try:
+ expr = env.compile(options.expr)
+ prgm = env.program(expr)
+ except CELParseError as ex:
+ print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ return 1
+
+ if options.arg:
+ activation = {
+ name: value for name, type, value in options.arg
+ }
+ else:
+ activation = {}
+
+ if options.null_input:
+ # Don't read stdin, evaluate with only the activation context.
+ try:
+ result = prgm.evaluate(activation)
+ if options.boolean:
+ if isinstance(result, (celtypes.BoolType, bool)):
+ summary = 0 if result else 1
+ else:
+ logger.warning("Expected celtypes.BoolType, got %s = %r", type(result), result)
+ summary = 2
+ else:
+ output_display(result)
+ summary = 0
+ except CELEvalError as ex:
+ print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ summary = 2
+
+ elif options.slurp:
+ # If slurp, one big document, part of the "jq" package in the activation context.
+ document = sys.stdin.read()
+ summary = process_json_doc(
+ output_display, prgm, activation, options.document or options.package, document,
+ options.boolean
+ )
+
+ else:
+ # NDJSON format: each line is a JSON doc. We repackage the doc into celtypes objects.
+ # Each document is in the "jq" package in the activation context.
+ summary = 0
+ for document in sys.stdin:
+ summary = max(
+ summary,
+ process_json_doc(
+ output_display, prgm, activation, options.document or options.package, document,
+ options.boolean
+ )
+ )
+
+ return summary
+
+
+if __name__ == "__main__": # pragma: no cover
+ logging.basicConfig(level=logging.WARNING)
+ exit = main(sys.argv[1:])
+ logging.shutdown()
+ sys.exit(exit)
diff --git a/.venv/lib/python3.12/site-packages/celpy/adapter.py b/.venv/lib/python3.12/site-packages/celpy/adapter.py
new file mode 100644
index 00000000..572b65ce
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/adapter.py
@@ -0,0 +1,137 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+"""
+Type Adapter to convert Python-native types into CEL structures.
+
+Currently, Atomic Python objects have direct use of types in :mod:`celpy.celtypes`.
+
+Non-Atomic Python objects are characterized by JSON and Protobuf
+objects. This module has functions to convert JSON objects to CEL.
+
+The protobuf decoder is TBD.
+
+A more sophisticated type injection capability may be needed to permit
+additional types or extensions to :mod:`celpy.celtypes`.
+"""
+import base64
+import datetime
+import json
+from typing import Any, Dict, List, Union, cast
+
+from celpy import celtypes
+
+JSON = Union[Dict[str, Any], List[Any], bool, float, int, str, None]
+
+
+class CELJSONEncoder(json.JSONEncoder):
+ """
+ An Encoder to export CEL objects as JSON text.
+
+ This is **not** a reversible transformation. Some things are coerced to strings
+ without any more detailed type marker.
+ Specifically timestamps, durations, and bytes.
+ """
+ @staticmethod
+ def to_python(
+ cel_object: celtypes.Value) -> Union[celtypes.Value, List[Any], Dict[Any, Any], bool]:
+ """Recursive walk through the CEL object, replacing BoolType with native bool instances.
+ This lets the :py:mod:`json` module correctly represent the obects
+ with JSON ``true`` and ``false``.
+
+ This will also replace ListType and MapType with native ``list`` and ``dict``.
+ All other CEL objects will be left intact. This creates an intermediate hybrid
+ beast that's not quite a :py:class:`celtypes.Value` because a few things have been replaced.
+ """
+ if isinstance(cel_object, celtypes.BoolType):
+ return True if cel_object else False
+ elif isinstance(cel_object, celtypes.ListType):
+ return [CELJSONEncoder.to_python(item) for item in cel_object]
+ elif isinstance(cel_object, celtypes.MapType):
+ return {
+ CELJSONEncoder.to_python(key): CELJSONEncoder.to_python(value)
+ for key, value in cel_object.items()
+ }
+ else:
+ return cel_object
+
+ def encode(self, cel_object: celtypes.Value) -> str:
+ """
+ Override built-in encode to create proper Python :py:class:`bool` objects.
+ """
+ return super().encode(CELJSONEncoder.to_python(cel_object))
+
+ def default(self, cel_object: celtypes.Value) -> JSON:
+ if isinstance(cel_object, celtypes.TimestampType):
+ return str(cel_object)
+ elif isinstance(cel_object, celtypes.DurationType):
+ return str(cel_object)
+ elif isinstance(cel_object, celtypes.BytesType):
+ return base64.b64encode(cel_object).decode("ASCII")
+ else:
+ return cast(JSON, super().default(cel_object))
+
+
+class CELJSONDecoder(json.JSONDecoder):
+ """
+ An Encoder to import CEL objects from JSON to the extent possible.
+
+ This does not handle non-JSON types in any form. Coercion from string
+ to TimestampType or DurationType or BytesType is handled by celtype
+ constructors.
+ """
+ def decode(self, source: str, _w: Any = None) -> Any:
+ raw_json = super().decode(source)
+ return json_to_cel(raw_json)
+
+
+def json_to_cel(document: JSON) -> celtypes.Value:
+ """Convert parsed JSON object from Python to CEL to the extent possible.
+
+ It's difficult to distinguish strings which should be timestamps or durations.
+
+ ::
+
+ >>> from pprint import pprint
+ >>> from celpy.adapter import json_to_cel
+ >>> doc = json.loads('["str", 42, 3.14, null, true, {"hello": "world"}]')
+ >>> cel = json_to_cel(doc)
+ >>> pprint(cel)
+ ListType([StringType('str'), IntType(42), DoubleType(3.14), None, BoolType(True), \
+MapType({StringType('hello'): StringType('world')})])
+ """
+ if isinstance(document, bool):
+ return celtypes.BoolType(document)
+ elif isinstance(document, float):
+ return celtypes.DoubleType(document)
+ elif isinstance(document, int):
+ return celtypes.IntType(document)
+ elif isinstance(document, str):
+ return celtypes.StringType(document)
+ elif document is None:
+ return None
+ elif isinstance(document, (tuple, List)):
+ return celtypes.ListType(
+ [json_to_cel(item) for item in document]
+ )
+ elif isinstance(document, Dict):
+ return celtypes.MapType(
+ {json_to_cel(key): json_to_cel(value) for key, value in document.items()}
+ )
+ elif isinstance(document, datetime.datetime):
+ return celtypes.TimestampType(document)
+ elif isinstance(document, datetime.timedelta):
+ return celtypes.DurationType(document)
+ else:
+ raise ValueError(f"unexpected type {type(document)} in JSON structure {document!r}")
diff --git a/.venv/lib/python3.12/site-packages/celpy/c7nlib.py b/.venv/lib/python3.12/site-packages/celpy/c7nlib.py
new file mode 100644
index 00000000..4ec9075e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/c7nlib.py
@@ -0,0 +1,1582 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+Functions for C7N features when evaluating CEL expressions.
+
+These functions provide a mapping between C7N features and CEL.
+
+These functions are exposed by the global ``FUNCTIONS`` dictionary that is provided
+to the CEL evaluation run-time to provide necessary C7N features.
+
+The functions rely on implementation details in the ``CELFilter`` class.
+
+The API
+=======
+
+C7N uses CEL and the :py:mod:`c7nlib` module as follows::
+
+ class CELFilter(c7n.filters.core.Filter): # See below for the long list of mixins.
+ decls = {
+ "resource": celpy.celtypes.MapType,
+ "now": celpy.celtypes.TimestampType,
+ "event": celpy.celtypes.MapType,
+ }
+ decls.update(celpy.c7nlib.DECLARATIONS)
+
+ def __init__(self, expr: str) -> None:
+ self.expr = expr
+
+ def validate(self) -> None:
+ cel_env = celpy.Environment(
+ annotations=self.decls,
+ runner_class=c7nlib.C7N_Interpreted_Runner)
+ cel_ast = cel_env.compile(self.expr)
+ self.pgm = cel_env.program(cel_ast, functions=celpy.c7nlib.FUNCTIONS)
+
+ def process(self,
+ resources: Iterable[celpy.celtypes.MapType]) -> Iterator[celpy.celtypes.MapType]:
+ now = datetime.datetime.utcnow()
+ for resource in resources:
+ with C7NContext(filter=the_filter):
+ cel_activation = {
+ "resource": celpy.json_to_cel(resource),
+ "now": celpy.celtypes.TimestampType(now),
+ }
+ if self.pgm.evaluate(cel_activation):
+ yield resource
+
+This isn't the whole story, this is the starting point.
+
+This library of functions is bound into the program that's built from the AST.
+
+Several objects are required in activation for use by the CEL expression
+
+- ``resource``. The JSON document describing the cloud resource.
+
+- ``now.`` The current timestamp.
+
+- Optionally, ``event`` may have an AWS CloudWatch Event.
+
+
+The type: value Features
+========================
+
+The core value features of C7N require a number of CEL extensions.
+
+- :func:`glob(string, pattern)` uses Python fnmatch rules. This implements ``op: glob``.
+
+- :func:`difference(list, list)` creates intermediate sets and computes the difference
+ as a boolean value. Any difference is True. This implements ``op: difference``.
+
+- :func:`intersect(list, list)` creats intermediate sets and computes the intersection
+ as a boolean value. Any interection is True. This implements ``op: intersect``.
+
+- :func:`normalize(string)` supports normalized comparison between strings.
+ In this case, it means lower cased and trimmed. This implements ``value_type: normalize``.
+
+- :func:`net.cidr_contains` checks to see if a given CIDR block contains a specific
+ address. See https://www.openpolicyagent.org/docs/latest/policy-reference/#net.
+
+- :func:`net.cidr_size` extracts the prefix length of a parsed CIDR block.
+
+- :func:`version` uses ``disutils.version.LooseVersion`` to compare version strings.
+
+- :func:`resource_count` function. This is TBD.
+
+The type: value_from features
+==============================
+
+This relies on ``value_from()`` and ``jmes_path_map()`` functions
+
+In context, it looks like this::
+
+ value_from("s3://c7n-resources/exemptions.json", "json")
+ .jmes_path_map('exemptions.ec2.rehydration.["IamInstanceProfile.Arn"][].*[].*[]')
+ .contains(resource["IamInstanceProfile"]["Arn"])
+
+The ``value_from()`` function reads values from a given URI.
+
+- A full URI for an S3 bucket.
+
+- A full URI for a server that supports HTTPS GET requests.
+
+If a format is given, this is used, otherwise it's based on the
+suffix of the path.
+
+The ``jmes_path_map()`` function compiles and applies a JMESPath
+expression against each item in the collection to create a
+new collection. To an extent, this repeats functionality
+from the ``map()`` macro.
+
+Additional Functions
+====================
+
+A number of C7N subclasses of ``Filter`` provide additional features. There are
+at least 70-odd functions that are expressed or implied by these filters.
+
+Because the CEL expressions are always part of a ``CELFilter``, all of these
+additional C7N features need to be transformed into "mixins" that are implemented
+in two places. The function is part of the legacy subclass of ``Filter``,
+and the function is also part of ``CELFilter``.
+
+::
+
+ class InstanceImageMixin:
+ # from :py:class:`InstanceImageBase` refactoring
+ def get_instance_image(self):
+ pass
+
+ class RelatedResourceMixin:
+ # from :py:class:`RelatedResourceFilter` mixin
+ def get_related_ids(self):
+ pass
+
+ def get_related(self):
+ pass
+
+ class CredentialReportMixin:
+ # from :py:class:`c7n.resources.iam.CredentialReport` filter.
+ def get_credential_report(self):
+ pass
+
+ class ResourceKmsKeyAliasMixin:
+ # from :py:class:`c7n.resources.kms.ResourceKmsKeyAlias`
+ def get_matching_aliases(self, resource):
+ pass
+
+ class CrossAccountAccessMixin:
+ # from :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ def get_accounts(self, resource):
+ pass
+ def get_vpcs(self, resource):
+ pass
+ def get_vpces(self, resource):
+ pass
+ def get_orgids(self, resource):
+ pass
+ # from :py:class:`c7n.resources.secretsmanager.CrossAccountAccessFilter`
+ def get_resource_policy(self, resource):
+ pass
+
+ class SNSCrossAccountMixin:
+ # from :py:class:`c7n.resources.sns.SNSCrossAccount`
+ def get_endpoints(self, resource):
+ pass
+ def get_protocols(self, resource):
+ pass
+
+ class ImagesUnusedMixin:
+ # from :py:class:`c7n.resources.ami.ImageUnusedFilter`
+ def _pull_ec2_images(self, resource):
+ pass
+ def _pull_asg_images(self, resource):
+ pass
+
+ class SnapshotUnusedMixin:
+ # from :py:class:`c7n.resources.ebs.SnapshotUnusedFilter`
+ def _pull_asg_snapshots(self, resource):
+ pass
+ def _pull_ami_snapshots(self, resource):
+ pass
+
+ class IamRoleUsageMixin:
+ # from :py:class:`c7n.resources.iam.IamRoleUsage`
+ def service_role_usage(self, resource):
+ pass
+ def instance_profile_usage(self, resource):
+ pass
+
+ class SGUsageMixin:
+ # from :py:class:`c7n.resources.vpc.SGUsage`
+ def scan_groups(self, resource):
+ pass
+
+ class IsShieldProtectedMixin:
+ # from :py:mod:`c7n.resources.shield`
+ def get_type_protections(self, resource):
+ pass
+
+ class ShieldEnabledMixin:
+ # from :py:class:`c7n.resources.account.ShieldEnabled`
+ def account_shield_subscriptions(self, resource):
+ pass
+
+ class CELFilter(
+ InstanceImageMixin, RelatedResourceMixin, CredentialReportMixin,
+ ResourceKmsKeyAliasMixin, CrossAccountAccessMixin, SNSCrossAccountMixin,
+ ImagesUnusedMixin, SnapshotUnusedMixin, IamRoleUsageMixin, SGUsageMixin,
+ Filter,
+ ):
+ '''Container for functions used by c7nlib to expose data to CEL'''
+ def __init__(self, data, manager) -> None:
+ super().__init__(data, manager)
+ assert data["type"].lower() == "cel"
+ self.expr = data["expr"]
+ self.parser = c7n.filters.offhours.ScheduleParser()
+
+ def validate(self):
+ pass # See above example
+
+ def process(self, resources):
+ pass # See above example
+
+This is not the complete list. See the ``tests/test_c7nlib.py`` for the ``celfilter_instance``
+fixture which contains **all** of the functions required.
+
+C7N Context Object
+==================
+
+A number of the functions require access to C7N features that are not simply part
+of the resource being filtered. There are two alternative ways to handle this dependency:
+
+- A global C7N context object that has the current ``CELFilter`` providing
+ access to C7N internals.
+
+- A ``C7N`` argument to the functions that need C7N access.
+ This would be provided in the activation context for CEL.
+
+To keep the library functions looking simple, the module global ``C7N`` is used.
+This avoids introducing a non-CEL parameter to the c7nlib functions.
+
+The ``C7N`` context object contains the following attributes:
+
+- ``filter``. The original C7N ``Filter`` object. This provides access to the
+ resource manager. It can be used to manage supplemental
+ queries using C7N caches and other resource management.
+
+This is set by the :py:class:`C7NContext` prior to CEL evaluation.
+
+Name Resolution
+===============
+
+Note that names are **not** resolved via a lookup in the program object,
+an instance of the :py:class:`celpy.Runner` class. To keep these functions
+simple, the runner is not part of the run-time, and name resolution
+will appear to be "hard-wrired" among these functions.
+
+This is rarely an issue, since most of these functions are independent.
+The :func:`value_from` function relies on :func:`text_from` and :func:`parse_text`.
+Changing either of these functions with an override won't modify the behavior
+of :func:`value_from`.
+"""
+import csv
+import fnmatch
+import io
+import ipaddress
+import json
+import logging
+import os.path
+import sys
+import urllib.request
+import zlib
+from contextlib import closing
+from packaging.version import Version
+from types import TracebackType
+from typing import (Any, Callable, Dict, Iterator, List, Optional, Type, Union,
+ cast)
+
+import dateutil
+import jmespath # type: ignore [import-untyped]
+
+from celpy import InterpretedRunner, celtypes
+from celpy.adapter import json_to_cel
+from celpy.evaluation import Annotation, Context, Evaluator
+
+logger = logging.getLogger(__name__)
+
+
+class C7NContext:
+ """
+ Saves current C7N filter for use by functions in this module.
+
+ This is essential for making C7N filter available to *some* of these functions.
+
+ ::
+
+ with C7NContext(filter):
+ cel_prgm.evaluate(cel_activation)
+ """
+
+ def __init__(self, filter: Any) -> None:
+ self.filter = filter
+
+ def __repr__(self) -> str: # pragma: no cover
+ return f"{self.__class__.__name__}(filter={self.filter!r})"
+
+ def __enter__(self) -> None:
+ global C7N
+ C7N = self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ global C7N
+ C7N = cast("C7NContext", None)
+ return
+
+
+# An object used for access to the C7N filter.
+# A module global makes the interface functions much simpler.
+# They can rely on `C7N.filter` providing the current `CELFilter` instance.
+C7N = cast("C7NContext", None)
+
+
+def key(source: celtypes.ListType, target: celtypes.StringType) -> celtypes.Value:
+ """
+ The C7N shorthand ``tag:Name`` doesn't translate well to CEL. It extracts a single value
+ from a sequence of objects with a ``{"Key": x, "Value": y}`` structure; specifically,
+ the value for ``y`` when ``x == "Name"``.
+
+ This function locate a particular "Key": target within a list of {"Key": x, "Value", y} items,
+ returning the y value if one is found, null otherwise.
+
+ In effect, the ``key()`` function::
+
+ resource["Tags"].key("Name")["Value"]
+
+ is somewhat like::
+
+ resource["Tags"].filter(x, x["Key"] == "Name")[0]
+
+ But the ``key()`` function doesn't raise an exception if the key is not found,
+ instead it returns None.
+
+ We might want to generalize this into a ``first()`` reduction macro.
+ ``resource["Tags"].first(x, x["Key"] == "Name" ? x["Value"] : null, null)``
+ This macro returns the first non-null value or the default (which can be ``null``.)
+ """
+ key = celtypes.StringType("Key")
+ value = celtypes.StringType("Value")
+ matches: Iterator[celtypes.Value] = (
+ item
+ for item in source
+ if cast(celtypes.StringType, cast(celtypes.MapType, item).get(key))
+ == target # noqa: W503
+ )
+ try:
+ return cast(celtypes.MapType, next(matches)).get(value)
+ except StopIteration:
+ return None
+
+
+def glob(text: celtypes.StringType, pattern: celtypes.StringType) -> celtypes.BoolType:
+ """Compare a string with a pattern.
+
+ While ``"*.py".glob(some_string)`` seems logical because the pattern the more persistent object,
+ this seems to cause confusion.
+
+ We use ``some_string.glob("*.py")`` to express a regex-like rule. This parallels the CEL
+ `.matches()` method.
+
+ We also support ``glob(some_string, "*.py")``.
+ """
+ return celtypes.BoolType(fnmatch.fnmatch(text, pattern))
+
+
+def difference(left: celtypes.ListType, right: celtypes.ListType) -> celtypes.BoolType:
+ """
+ Compute the difference between two lists. This is ordered set difference: left - right.
+ It's true if the result is non-empty: there is an item in the left, not present in the right.
+ It's false if the result is empty: the lists are the same.
+ """
+ return celtypes.BoolType(bool(set(left) - set(right)))
+
+
+def intersect(left: celtypes.ListType, right: celtypes.ListType) -> celtypes.BoolType:
+ """
+ Compute the intersection between two lists.
+ It's true if the result is non-empty: there is an item in both lists.
+ It's false if the result is empty: there is no common item between the lists.
+ """
+ return celtypes.BoolType(bool(set(left) & set(right)))
+
+
+def normalize(string: celtypes.StringType) -> celtypes.StringType:
+ """
+ Normalize a string.
+ """
+ return celtypes.StringType(string.lower().strip())
+
+
+def unique_size(collection: celtypes.ListType) -> celtypes.IntType:
+ """
+ Unique size of a list
+ """
+ return celtypes.IntType(len(set(collection)))
+
+
+class IPv4Network(ipaddress.IPv4Network):
+
+ # Override for net 2 net containment comparison
+ def __contains__(self, other): # type: ignore[no-untyped-def]
+ if other is None:
+ return False
+ if isinstance(other, ipaddress._BaseNetwork):
+ return self.supernet_of(other) # type: ignore[no-untyped-call]
+ return super(IPv4Network, self).__contains__(other)
+
+ if sys.version_info.major == 3 and sys.version_info.minor <= 6: # pragma: no cover
+
+ @staticmethod
+ def _is_subnet_of(a, b): # type: ignore[no-untyped-def]
+ try:
+ # Always false if one is v4 and the other is v6.
+ if a._version != b._version:
+ raise TypeError(f"{a} and {b} are not of the same version")
+ return (
+ b.network_address <= a.network_address
+ and b.broadcast_address >= a.broadcast_address # noqa: W503
+ )
+ except AttributeError:
+ raise TypeError(
+ f"Unable to test subnet containment " f"between {a} and {b}"
+ )
+
+ def supernet_of(self, other): # type: ignore[no-untyped-def]
+ """Return True if this network is a supernet of other."""
+ return self._is_subnet_of(other, self) # type: ignore[no-untyped-call]
+
+
+CIDR = Union[None, IPv4Network, ipaddress.IPv4Address]
+CIDR_Class = Union[Type[IPv4Network], Callable[..., ipaddress.IPv4Address]]
+
+
+def parse_cidr(value: str) -> CIDR:
+ """
+ Process cidr ranges.
+
+ This is a union of types outside CEL.
+
+ It appears to be Union[None, IPv4Network, ipaddress.IPv4Address]
+ """
+ klass: CIDR_Class = IPv4Network
+ if "/" not in value:
+ klass = ipaddress.ip_address # type: ignore[assignment]
+ v: CIDR
+ try:
+ v = klass(value)
+ except (ipaddress.AddressValueError, ValueError):
+ v = None
+ return v
+
+
+def size_parse_cidr(value: celtypes.StringType,) -> Optional[celtypes.IntType]:
+ """CIDR prefixlen value"""
+ cidr = parse_cidr(value)
+ if cidr and isinstance(cidr, IPv4Network):
+ return celtypes.IntType(cidr.prefixlen)
+ else:
+ return None
+
+
+class ComparableVersion(Version):
+ """
+ The old LooseVersion could fail on comparing present strings, used
+ in the value as shorthand for certain options.
+
+ The new Version doesn't fail as easily.
+ """
+
+ def __eq__(self, other: object) -> bool:
+ try:
+ return super(ComparableVersion, self).__eq__(other)
+ except TypeError: # pragma: no cover
+ return False
+
+
+def version(
+ value: celtypes.StringType,
+) -> celtypes.Value: # actually, a ComparableVersion
+ return cast(celtypes.Value, ComparableVersion(value))
+
+
+def present(value: celtypes.StringType,) -> celtypes.Value:
+ return cast(celtypes.Value, bool(value))
+
+
+def absent(value: celtypes.StringType,) -> celtypes.Value:
+ return cast(celtypes.Value, not bool(value))
+
+
+def text_from(url: celtypes.StringType,) -> celtypes.Value:
+ """
+ Read raw text from a URL. This can be expanded to accept S3 or other URL's.
+ """
+ req = urllib.request.Request(url, headers={"Accept-Encoding": "gzip"})
+ raw_data: str
+ with closing(urllib.request.urlopen(req)) as response:
+ if response.info().get("Content-Encoding") == "gzip":
+ raw_data = zlib.decompress(response.read(), zlib.MAX_WBITS | 32).decode(
+ "utf8"
+ )
+ else:
+ raw_data = response.read().decode("utf-8")
+ return celtypes.StringType(raw_data)
+
+
+def parse_text(
+ source_text: celtypes.StringType, format: celtypes.StringType
+) -> celtypes.Value:
+ """
+ Parse raw text using a given format.
+ """
+ if format == "json":
+ return json_to_cel(json.loads(source_text))
+ elif format == "txt":
+ return celtypes.ListType(
+ [celtypes.StringType(s.rstrip()) for s in source_text.splitlines()]
+ )
+ elif format in ("ldjson", "ndjson", "jsonl"):
+ return celtypes.ListType(
+ [json_to_cel(json.loads(s)) for s in source_text.splitlines()]
+ )
+ elif format == "csv":
+ return celtypes.ListType(
+ [json_to_cel(row) for row in csv.reader(io.StringIO(source_text))]
+ )
+ elif format == "csv2dict":
+ return celtypes.ListType(
+ [json_to_cel(row) for row in csv.DictReader(io.StringIO(source_text))]
+ )
+ else:
+ raise ValueError(f"Unsupported format: {format!r}") # pragma: no cover
+
+
+def value_from(
+ url: celtypes.StringType, format: Optional[celtypes.StringType] = None,
+) -> celtypes.Value:
+ """
+ Read values from a URL.
+
+ First, do :func:`text_from` to read the source.
+ Then, do :func:`parse_text` to parse the source, if needed.
+
+ This makes the format optional, and deduces it from the URL's path information.
+
+ C7N will generally replace this with a function
+ that leverages a more sophisticated :class:`c7n.resolver.ValuesFrom`.
+ """
+ supported_formats = ("json", "ndjson", "ldjson", "jsonl", "txt", "csv", "csv2dict")
+
+ # 1. get format either from arg or URL
+ if not format:
+ _, suffix = os.path.splitext(url)
+ format = celtypes.StringType(suffix[1:])
+ if format not in supported_formats:
+ raise ValueError(f"Unsupported format: {format!r}")
+
+ # 2. read raw data
+ # Note this is directly bound to text_from() and does not go though the environment
+ # or other CEL indirection.
+ raw_data = cast(celtypes.StringType, text_from(url))
+
+ # 3. parse physical format (json, ldjson, ndjson, jsonl, txt, csv, csv2dict)
+ return parse_text(raw_data, format)
+
+
+def jmes_path(
+ source_data: celtypes.Value, path_source: celtypes.StringType
+) -> celtypes.Value:
+ """
+ Apply JMESPath to an object read from from a URL.
+ """
+ expression = jmespath.compile(path_source)
+ return json_to_cel(expression.search(source_data))
+
+
+def jmes_path_map(
+ source_data: celtypes.ListType, path_source: celtypes.StringType
+) -> celtypes.ListType:
+ """
+ Apply JMESPath to a each object read from from a URL.
+ This is for ndjson, nljson and jsonl files.
+ """
+ expression = jmespath.compile(path_source)
+ return celtypes.ListType(
+ [json_to_cel(expression.search(row)) for row in source_data]
+ )
+
+
+def marked_key(
+ source: celtypes.ListType, target: celtypes.StringType
+) -> celtypes.Value:
+ """
+ Examines a list of {"Key": text, "Value": text} mappings
+ looking for the given Key value.
+
+ Parses a ``message:action@action_date`` value into a mapping
+ {"message": message, "action": action, "action_date": action_date}
+
+ If no Key or no Value or the Value isn't the right structure,
+ the result is a null.
+ """
+ value = key(source, target)
+ if value is None:
+ return None
+ try:
+ msg, tgt = cast(celtypes.StringType, value).rsplit(":", 1)
+ action, action_date_str = tgt.strip().split("@", 1)
+ except ValueError:
+ return None
+ return celtypes.MapType(
+ {
+ celtypes.StringType("message"): celtypes.StringType(msg),
+ celtypes.StringType("action"): celtypes.StringType(action),
+ celtypes.StringType("action_date"): celtypes.TimestampType(action_date_str),
+ }
+ )
+
+
+def image(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Reach into C7N to get the image details for this EC2 or ASG resource.
+
+ Minimally, the creation date is transformed into a CEL timestamp.
+ We may want to slightly generalize this to json_to_cell() the entire Image object.
+
+ The following may be usable, but it seems too complex:
+
+ ::
+
+ C7N.filter.prefetch_instance_images(C7N.policy.resources)
+ image = C7N.filter.get_instance_image(resource["ImageId"])
+ return json_to_cel(image)
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`InstanceImageBase` mixin in a :py:class:`CELFilter` class.
+ We want to have the image details in the new :py:class:`CELFilter` instance.
+ """
+
+ # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter.
+ # Requies the policy already did this: C7N.filter.prefetch_instance_images([resource]) to
+ # populate cache.
+ image = C7N.filter.get_instance_image(resource)
+
+ if image:
+ creation_date = image["CreationDate"]
+ image_name = image["Name"]
+ else:
+ creation_date = "2000-01-01T01:01:01.000Z"
+ image_name = ""
+
+ return json_to_cel(
+ {"CreationDate": dateutil.parser.isoparse(creation_date), "Name": image_name}
+ )
+
+
+def get_raw_metrics(request: celtypes.MapType) -> celtypes.Value:
+ """
+ Reach into C7N and make a statistics request using the current C7N filter object.
+
+ The ``request`` parameter is the request object that is passed through to AWS via
+ the current C7N filter's manager. The request is a Mapping with the following keys and values:
+
+ ::
+
+ get_raw_metrics({
+ "Namespace": "AWS/EC2",
+ "MetricName": "CPUUtilization",
+ "Dimensions": {"Name": "InstanceId", "Value": resource.InstanceId},
+ "Statistics": ["Average"],
+ "StartTime": now - duration("4d"),
+ "EndTime": now,
+ "Period": duration("86400s")
+ })
+
+ The request is passed through to AWS more-or-less directly. The result is a CEL
+ list of values for then requested statistic. A ``.map()`` macro
+ can be used to compute additional details. An ``.exists()`` macro can filter the
+ data to look for actionable values.
+
+ We would prefer to refactor C7N and implement this with code something like this:
+
+ ::
+
+ C7N.filter.prepare_query(C7N.policy.resources)
+ data = C7N.filter.get_resource_statistics(client, resource)
+ return json_to_cel(data)
+
+ .. todo:: Refactor C7N
+
+ Provide a :py:class:`MetricsAccess` mixin in a :py:class:`CELFilter` class.
+ We want to have the metrics processing in the new :py:class:`CELFilter` instance.
+
+ """
+ client = C7N.filter.manager.session_factory().client("cloudwatch")
+ data = client.get_metric_statistics(
+ Namespace=request["Namespace"],
+ MetricName=request["MetricName"],
+ Statistics=request["Statistics"],
+ StartTime=request["StartTime"],
+ EndTime=request["EndTime"],
+ Period=request["Period"],
+ Dimensions=request["Dimensions"],
+ )["Datapoints"]
+
+ return json_to_cel(data)
+
+
+def get_metrics(
+ resource: celtypes.MapType, request: celtypes.MapType
+) -> celtypes.Value:
+ """
+ Reach into C7N and make a statistics request using the current C7N filter.
+
+ This builds a request object that is passed through to AWS via the :func:`get_raw_metrics`
+ function.
+
+ The ``request`` parameter is a Mapping with the following keys and values:
+
+ ::
+
+ resource.get_metrics({"MetricName": "CPUUtilization", "Statistic": "Average",
+ "StartTime": now - duration("4d"), "EndTime": now, "Period": duration("86400s")}
+ ).exists(m, m < 30)
+
+ The namespace is derived from the ``C7N.policy``. The dimensions are derived from
+ the ``C7N.fiter.model``.
+
+ .. todo:: Refactor C7N
+
+ Provide a :py:class:`MetricsAccess` mixin in a :py:class:`CELFilter` class.
+ We want to have the metrics processing in the new :py:class:`CELFilter` instance.
+
+ """
+ dimension = C7N.filter.manager.get_model().dimension
+ namespace = C7N.filter.manager.resource_type
+ # TODO: Varies by resource/policy type. Each policy's model may have different dimensions.
+ dimensions = [{"Name": dimension, "Value": resource.get(dimension)}]
+ raw_metrics = get_raw_metrics(cast(celtypes.MapType, json_to_cel(
+ {
+ "Namespace": namespace,
+ "MetricName": request["MetricName"],
+ "Dimensions": dimensions,
+ "Statistics": [request["Statistic"]],
+ "StartTime": request["StartTime"],
+ "EndTime": request["EndTime"],
+ "Period": request["Period"],
+ }
+ )))
+ return json_to_cel(
+ [
+ cast(Dict[str, celtypes.Value], item).get(request["Statistic"])
+ for item in cast(List[celtypes.Value], raw_metrics)
+ ]
+ )
+
+
+def get_raw_health_events(request: celtypes.MapType) -> celtypes.Value:
+ """
+ Reach into C7N and make a health-events request using the current C7N filter.
+
+ The ``request`` parameter is the filter object that is passed through to AWS via
+ the current C7N filter's manager. The request is a List of AWS health events.
+
+ ::
+
+ get_raw_health_events({
+ "services": ["ELASTICFILESYSTEM"],
+ "regions": ["us-east-1", "global"],
+ "eventStatusCodes": ['open', 'upcoming'],
+ })
+ """
+ client = C7N.filter.manager.session_factory().client(
+ 'health', region_name='us-east-1')
+ data = client.describe_events(filter=request)['events']
+ return json_to_cel(data)
+
+
+def get_health_events(
+ resource: celtypes.MapType,
+ statuses: Optional[List[celtypes.Value]] = None
+) -> celtypes.Value:
+ """
+ Reach into C7N and make a health-event request using the current C7N filter.
+
+ This builds a request object that is passed through to AWS via the :func:`get_raw_health_events`
+ function.
+
+ .. todo:: Handle optional list of event types.
+ """
+ if not statuses:
+ statuses = [celtypes.StringType('open'), celtypes.StringType('upcoming')]
+ phd_svc_name_map = {
+ 'app-elb': 'ELASTICLOADBALANCING',
+ 'ebs': 'EBS',
+ 'efs': 'ELASTICFILESYSTEM',
+ 'elb': 'ELASTICLOADBALANCING',
+ 'emr': 'ELASTICMAPREDUCE'
+ }
+ m = C7N.filter.manager
+ service = phd_svc_name_map.get(m.data['resource'], m.get_model().service.upper())
+ raw_events = get_raw_health_events(cast(celtypes.MapType, json_to_cel(
+ {
+ "services": [service],
+ "regions": [m.config.region, 'global'],
+ "eventStatusCodes": statuses,
+ }
+ )))
+ return raw_events
+
+
+def get_related_ids(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_ids() request using the current C7N filter.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class.
+ We want to have the related id's details in the new :py:class:`CELFilter` instance.
+ """
+
+ # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter.
+ related_ids = C7N.filter.get_related_ids(resource)
+ return json_to_cel(related_ids)
+
+
+def get_related_sgs(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_sgs() request using the current C7N filter.
+ """
+ security_groups = C7N.filter.get_related_sgs(resource)
+ return json_to_cel(security_groups)
+
+
+def get_related_subnets(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_subnets() request using the current C7N filter.
+ """
+ subnets = C7N.filter.get_related_subnets(resource)
+ return json_to_cel(subnets)
+
+
+def get_related_nat_gateways(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_nat_gateways() request using the current C7N filter.
+ """
+ nat_gateways = C7N.filter.get_related_nat_gateways(resource)
+ return json_to_cel(nat_gateways)
+
+
+def get_related_igws(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_igws() request using the current C7N filter.
+ """
+ igws = C7N.filter.get_related_igws(resource)
+ return json_to_cel(igws)
+
+
+def get_related_security_configs(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_security_configs() request using the current C7N filter.
+ """
+ security_configs = C7N.filter.get_related_security_configs(resource)
+ return json_to_cel(security_configs)
+
+
+def get_related_vpc(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_vpc() request using the current C7N filter.
+ """
+ vpc = C7N.filter.get_related_vpc(resource)
+ return json_to_cel(vpc)
+
+
+def get_related_kms_keys(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related_kms_keys() request using the current C7N filter.
+ """
+ vpc = C7N.filter.get_related_kms_keys(resource)
+ return json_to_cel(vpc)
+
+
+def security_group(security_group_id: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related() request using the current C7N filter to get
+ the security group.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class.
+ We want to have the related id's details in the new :py:class:`CELFilter` instance.
+ See :py:class:`VpcSecurityGroupFilter` subclass of :py:class:`RelatedResourceFilter`.
+ """
+
+ # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter.
+ security_groups = C7N.filter.get_related([security_group_id])
+ return json_to_cel(security_groups)
+
+
+def subnet(subnet_id: celtypes.Value,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related() request using the current C7N filter to get
+ the subnet.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class.
+ We want to have the related id's details in the new :py:class:`CELFilter` instance.
+ See :py:class:`VpcSubnetFilter` subclass of :py:class:`RelatedResourceFilter`.
+ """
+ # Get related ID's first, then get items for the related ID's.
+ subnets = C7N.filter.get_related([subnet_id])
+ return json_to_cel(subnets)
+
+
+def flow_logs(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N and locate the flow logs using the current C7N filter.
+
+ .. todo:: Refactor C7N
+
+ Provide a separate function to get the flow logs, separate from the
+ the filter processing.
+
+ .. todo:: Refactor :func:`c7nlib.flow_logs` -- it exposes too much implementation detail.
+
+ """
+ # TODO: Refactor into a function in ``CELFilter``. Should not be here.
+ client = C7N.filter.manager.session_factory().client("ec2")
+ logs = client.describe_flow_logs().get("FlowLogs", ())
+ m = C7N.filter.manager.get_model()
+ resource_map: Dict[str, List[Dict[str, Any]]] = {}
+ for fl in logs:
+ resource_map.setdefault(fl["ResourceId"], []).append(fl)
+ if resource.get(m.id) in resource_map:
+ flogs = resource_map[cast(str, resource.get(m.id))]
+ return json_to_cel(flogs)
+ return json_to_cel([])
+
+
+def vpc(vpc_id: celtypes.Value,) -> celtypes.Value:
+ """
+ Reach into C7N and make a ``get_related()`` request using the current C7N filter to get
+ the VPC details.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class.
+ We want to have the related id's details in the new :py:class:`CELFilter` instance.
+ See :py:class:`VpcFilter` subclass of :py:class:`RelatedResourceFilter`.
+ """
+ # Assuming the :py:class:`CELFilter` class has this method extracted from the legacy filter.
+ vpc = C7N.filter.get_related([vpc_id])
+ return json_to_cel(vpc)
+
+
+def subst(jmes_path: celtypes.StringType,) -> celtypes.StringType:
+ """
+ Reach into C7N and build a set of substitutions to replace text in a JMES path.
+
+ This is based on how :py:class:`c7n.resolver.ValuesFrom` works. There are
+ two possible substitution values:
+
+ - account_id
+ - region
+
+ :param jmes_path: the source
+ :return: A JMES with values replaced.
+ """
+
+ config_args = {
+ "account_id": C7N.filter.manager.config.account_id,
+ "region": C7N.filter.manager.config.region,
+ }
+ return celtypes.StringType(jmes_path.format(**config_args))
+
+
+def credentials(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_related() request using the current C7N filter to get
+ the IAM-role credential details.
+
+ See :py:class:`c7n.resources.iam.CredentialReport` filter.
+ The `get_credential_report()` function does what we need.
+
+ .. todo:: Refactor C7N
+
+ Refactor the :py:class:`c7n.resources.iam.CredentialReport` filter into a
+ ``CredentialReportMixin`` mixin to the :py:class:`CELFilter` class.
+ The ``get_credential_report()`` function does what we need.
+ """
+ return json_to_cel(C7N.filter.get_credential_report(resource))
+
+
+def kms_alias(vpc_id: celtypes.Value,) -> celtypes.Value:
+ """
+ Reach into C7N and make a get_matching_aliases() request using the current C7N filter to get
+ the alias.
+
+ See :py:class:`c7n.resources.kms.ResourceKmsKeyAlias`.
+ The `get_matching_aliases()` function does what we need.
+
+ .. todo:: Refactor C7N
+
+ Refactor the :py:class:`c7n.resources.kms.ResourceKmsKeyAlias` filter into a
+ ``ResourceKmsKeyAliasMixin`` mixin to the :py:class:`CELFilter` class.
+ The ``get_matching_aliases()`` dfunction does what we need.
+ """
+ return json_to_cel(C7N.filter.get_matching_aliases())
+
+
+def kms_key(key_id: celtypes.Value,) -> celtypes.Value:
+ """
+ Reach into C7N and make a ``get_related()`` request using the current C7N filter to get
+ the key. We're looking for the c7n.resources.kms.Key resource manager to get the related key.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`RelatedResourceFilter` mixin in a :py:class:`CELFilter` class.
+ """
+ key = C7N.filter.get_related([key_id])
+ return json_to_cel(key)
+
+
+def resource_schedule(
+ tag_value: celtypes.Value,
+) -> celtypes.Value:
+ """
+ Reach into C7N and use the the :py:class:`c7n.filters.offhours.ScheduleParser` class
+ to examine the tag's value, the current time, and return a True/False.
+ This parser is the `parser` value of the :py:class:`c7n.filters.offhours.Time` filter class.
+ Using the filter's `parser.parse(value)` provides needed structure.
+
+ The `filter.parser.parse(value)` will transform text of the Tag value
+ into a dictionary. This is further transformed to something we can use in CEL.
+
+ From this
+ ::
+
+ off=[(M-F,21),(U,18)];on=[(M-F,6),(U,10)];tz=pt
+
+ C7N ScheduleParser produces this
+ ::
+
+ {
+ off: [
+ { days: [1, 2, 3, 4, 5], hour: 21 },
+ { days: [0], hour: 18 }
+ ],
+ on: [
+ { days: [1, 2, 3, 4, 5], hour: 6 },
+ { days: [0], hour: 10 }
+ ],
+ tz: "pt"
+ }
+
+ For CEL, we need this
+ ::
+
+ {
+ off: [
+ { days: [1, 2, 3, 4, 5], hour: 21, tz: "pt" },
+ { days: [0], hour: 18, tz: "pt" }
+ ],
+ on: [
+ { days: [1, 2, 3, 4, 5], hour: 6, tz: "pt" },
+ { days: [0], hour: 10, tz: "pt" }
+ ],
+ }
+
+ This lets a CEL expression use
+ ::
+
+ key("maid_offhours").resource_schedule().off.exists(s,
+ now.getDayOfWeek(s.tz) in s.days && now.getHour(s.tz) == s.hour)
+ """
+ c7n_sched_doc = C7N.filter.parser.parse(tag_value)
+ tz = c7n_sched_doc.pop("tz", "et")
+ cel_sched_doc = {
+ state: [
+ {"days": time["days"], "hour": time["hour"], "tz": tz} for time in time_list
+ ]
+ for state, time_list in c7n_sched_doc.items()
+ }
+ return json_to_cel(cel_sched_doc)
+
+
+def get_accounts(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N filter and get accounts for a given resource.
+ Used by resources like AMI's, log-groups, ebs-snapshot, etc.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ as a mixin to ``CELFilter``.
+ """
+ return json_to_cel(C7N.filter.get_accounts())
+
+
+def get_vpcs(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N filter and get vpcs for a given resource.
+ Used by resources like AMI's, log-groups, ebs-snapshot, etc.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ as a mixin to ``CELFilter``.
+ """
+ return json_to_cel(C7N.filter.get_vpcs())
+
+
+def get_vpces(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N filter and get vpces for a given resource.
+ Used by resources like AMI's, log-groups, ebs-snapshot, etc.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ as a mixin to ``CELFilter``.
+
+ """
+ return json_to_cel(C7N.filter.get_vpces())
+
+
+def get_orgids(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N filter and get orgids for a given resource.
+ Used by resources like AMI's, log-groups, ebs-snapshot, etc.
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ as a mixin to ``CELFilter``.
+ """
+ return json_to_cel(C7N.filter.get_orgids())
+
+
+def get_endpoints(resource: celtypes.MapType,) -> celtypes.Value:
+ """For sns resources
+
+ .. todo:: Refactor C7N
+
+ Provide the :py:class:`c7n.filters.iamaccessfilter.CrossAccountAccessFilter`
+ as a mixin to ``CELFilter``.
+ """
+ return json_to_cel(C7N.filter.get_endpoints())
+
+
+def get_protocols(resource: celtypes.MapType,) -> celtypes.Value:
+ """For sns resources
+
+ .. todo:: Refactor C7N
+ """
+ return json_to_cel(C7N.filter.get_protocols())
+
+
+def get_key_policy(resource: celtypes.MapType,) -> celtypes.Value:
+ """For kms resources
+
+ .. todo:: Refactor C7N
+ """
+ key_id = resource.get(
+ celtypes.StringType("TargetKeyId"),
+ resource.get(celtypes.StringType("KeyId")))
+ client = C7N.filter.manager.session_factory().client("kms")
+ return json_to_cel(
+ client.get_key_policy(
+ KeyId=key_id,
+ PolicyName='default')['Policy']
+ )
+
+
+def get_resource_policy(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ Reach into C7N filter and get the resource policy for a given resource.
+ Used by resources like AMI's, log-groups, ebs-snapshot, etc.
+
+ .. todo:: Refactor C7N
+ """
+ return json_to_cel(C7N.filter.get_resource_policy())
+
+
+def describe_subscription_filters(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ For log-groups resources.
+
+ .. todo:: Refactor C7N
+
+ this should be directly available in CELFilter.
+ """
+ client = C7N.filter.manager.session_factory().client("logs")
+ return json_to_cel(
+ C7N.filter.manager.retry(
+ client.describe_subscription_filters,
+ logGroupName=resource['logGroupName']
+ ).get('subscriptionFilters', ())
+ )
+
+
+def describe_db_snapshot_attributes(resource: celtypes.MapType,) -> celtypes.Value:
+ """
+ For rds-snapshot and ebs-snapshot resources
+
+ .. todo:: Refactor C7N
+
+ this should be directly available in CELFilter.
+ """
+ client = C7N.filter.manager.session_factory().client("ec2")
+ return json_to_cel(
+ C7N.filter.manager.retry(
+ client.describe_snapshot_attribute,
+ SnapshotId=resource['SnapshotId'],
+ Attribute='createVolumePermission'
+ )
+ )
+
+
+def arn_split(arn: celtypes.StringType, field: celtypes.StringType) -> celtypes.Value:
+ """
+ Parse an ARN, removing a partivular field.
+ The field name must one one of
+ "partition", "service", "region", "account-id", "resource-type", "resource-id"
+ In the case of a ``resource-type/resource-id`` path, this will be a "resource-id" value,
+ and there will be no "resource-type".
+
+ Examples formats
+
+ ``arn:partition:service:region:account-id:resource-id``
+
+ ``arn:partition:service:region:account-id:resource-type/resource-id``
+
+ ``arn:partition:service:region:account-id:resource-type:resource-id``
+ """
+ field_names = {
+ len(names): names
+ for names in [
+ ("partition", "service", "region", "account-id", "resource-id"),
+ ("partition", "service", "region", "account-id", "resource-type", "resource-id"),
+ ]
+ }
+ prefix, *fields = arn.split(":")
+ if prefix != "arn":
+ raise ValueError(f"Not an ARN: {arn}")
+ mapping = dict(zip(field_names[len(fields)], fields))
+ return json_to_cel(mapping[field])
+
+
+def all_images() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter._pull_ec2_images` and :py:meth:`CELFilter._pull_asg_images`
+
+ See :py:class:`c7n.resources.ami.ImageUnusedFilter`
+ """
+ return json_to_cel(
+ list(
+ C7N.filter._pull_ec2_images() | C7N.filter._pull_asg_images()
+ )
+ )
+
+
+def all_snapshots() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter._pull_asg_snapshots`
+ and :py:meth:`CELFilter._pull_ami_snapshots`
+
+ See :py:class:`c7n.resources.ebs.SnapshotUnusedFilter`
+ """
+ return json_to_cel(
+ list(
+ C7N.filter._pull_asg_snapshots() | C7N.filter._pull_ami_snapshots()
+ )
+ )
+
+
+def all_launch_configuration_names() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.manager.get_launch_configuration_names`
+
+ See :py:class:`c7n.resources.asg.UnusedLaunchConfig`
+ """
+ asgs = C7N.filter.manager.get_resource_manager('asg').resources()
+ used = set([
+ a.get('LaunchConfigurationName', a['AutoScalingGroupName'])
+ for a in asgs if not a.get('LaunchTemplate')])
+ return json_to_cel(list(used))
+
+
+def all_service_roles() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.service_role_usage`
+
+ See :py:class:`c7n.resources.iam.UnusedIamRole`
+ """
+ return json_to_cel(C7N.filter.service_role_usage())
+
+
+def all_instance_profiles() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.instance_profile_usage`
+
+ See :py:class:`c7n.resources.iam.UnusedInstanceProfiles`
+ """
+ return json_to_cel(C7N.filter.instance_profile_usage())
+
+
+def all_dbsubenet_groups() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.get_dbsubnet_group_used`
+
+ See :py:class:`c7n.resources.rds.UnusedRDSSubnetGroup`
+ """
+ rds = C7N.filter.manager.get_resource_manager('rds').resources()
+ used = set([
+ r.get('DBSubnetGroupName', r['DBInstanceIdentifier'])
+ for r in rds])
+ return json_to_cel(list(used))
+
+
+def all_scan_groups() -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.scan_groups`
+
+ See :py:class:`c7n.resources.vpc.UnusedSecurityGroup`
+ """
+ return json_to_cel(C7N.filter.scan_groups())
+
+
+def get_access_log(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.resources`
+
+ See :py:class:`c7n.resources.elb.IsNotLoggingFilter` and
+ :py:class:`c7n.resources.elb.IsLoggingFilter`.
+ """
+ client = C7N.filter.manager.session_factory().client('elb')
+ results = client.describe_load_balancer_attributes(
+ LoadBalancerName=resource['LoadBalancerName'])
+ return json_to_cel(results['LoadBalancerAttributes'])
+
+
+def get_load_balancer(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Depends on :py:meth:`CELFilter.resources`
+
+ See :py:class:`c7n.resources.appelb.IsNotLoggingFilter` and
+ :py:class:`c7n.resources.appelb.IsLoggingFilter`.
+ """
+ def parse_attribute_value(v: str) -> Union[int, bool, str]:
+ """Lightweight JSON atomic value convertion to native Python."""
+ if v.isdigit():
+ return int(v)
+ elif v == 'true':
+ return True
+ elif v == 'false':
+ return False
+ return v
+
+ client = C7N.filter.manager.session_factory().client('elbv2')
+ results = client.describe_load_balancer_attributes(
+ LoadBalancerArn=resource['LoadBalancerArn'])
+ print(results)
+ return json_to_cel(
+ dict(
+ (item["Key"], parse_attribute_value(item["Value"]))
+ for item in results['Attributes']
+ )
+ )
+
+
+def shield_protection(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Depends on the :py:meth:`c7n.resources.shield.IsShieldProtected.process` method.
+ This needs to be refactored and renamed to avoid collisions with other ``process()`` variants.
+
+ Applies to most resource types.
+ """
+ client = C7N.filter.manager.session_factory().client('shield', region_name='us-east-1')
+ protections = C7N.filter.get_type_protections(client, C7N.filter.manager.get_model())
+ protected_resources = [p['ResourceArn'] for p in protections]
+ return json_to_cel(protected_resources)
+
+
+def shield_subscription(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Depends on :py:meth:`c7n.resources.account.ShieldEnabled.process` method.
+ This needs to be refactored and renamed to avoid collisions with other ``process()`` variants.
+
+ Applies to account resources only.
+ """
+ subscriptions = C7N.filter.account_shield_subscriptions(resource)
+ return json_to_cel(subscriptions)
+
+
+def web_acls(resource: celtypes.MapType) -> celtypes.Value:
+ """
+ Depends on :py:meth:`c7n.resources.cloudfront.IsWafEnabled.process` method.
+ This needs to be refactored and renamed to avoid collisions with other ``process()`` variants.
+ """
+ wafs = C7N.filter.manager.get_resource_manager('waf').resources()
+ waf_name_id_map = {w['Name']: w['WebACLId'] for w in wafs}
+ return json_to_cel(waf_name_id_map)
+
+
+DECLARATIONS: Dict[str, Annotation] = {
+ "glob": celtypes.FunctionType,
+ "difference": celtypes.FunctionType,
+ "intersect": celtypes.FunctionType,
+ "normalize": celtypes.FunctionType,
+ "parse_cidr": celtypes.FunctionType, # Callable[..., CIDR],
+ "size_parse_cidr": celtypes.FunctionType,
+ "unique_size": celtypes.FunctionType,
+ "version": celtypes.FunctionType, # Callable[..., ComparableVersion],
+ "present": celtypes.FunctionType,
+ "absent": celtypes.FunctionType,
+ "text_from": celtypes.FunctionType,
+ "value_from": celtypes.FunctionType,
+ "jmes_path": celtypes.FunctionType,
+ "jmes_path_map": celtypes.FunctionType,
+ "key": celtypes.FunctionType,
+ "marked_key": celtypes.FunctionType,
+ "image": celtypes.FunctionType,
+ "get_metrics": celtypes.FunctionType,
+ "get_related_ids": celtypes.FunctionType,
+ "security_group": celtypes.FunctionType,
+ "subnet": celtypes.FunctionType,
+ "flow_logs": celtypes.FunctionType,
+ "vpc": celtypes.FunctionType,
+ "subst": celtypes.FunctionType,
+ "credentials": celtypes.FunctionType,
+ "kms_alias": celtypes.FunctionType,
+ "kms_key": celtypes.FunctionType,
+ "resource_schedule": celtypes.FunctionType,
+ "get_accounts": celtypes.FunctionType,
+ "get_related_sgs": celtypes.FunctionType,
+ "get_related_subnets": celtypes.FunctionType,
+ "get_related_nat_gateways": celtypes.FunctionType,
+ "get_related_igws": celtypes.FunctionType,
+ "get_related_security_configs": celtypes.FunctionType,
+ "get_related_vpc": celtypes.FunctionType,
+ "get_related_kms_keys": celtypes.FunctionType,
+ "get_vpcs": celtypes.FunctionType,
+ "get_vpces": celtypes.FunctionType,
+ "get_orgids": celtypes.FunctionType,
+ "get_endpoints": celtypes.FunctionType,
+ "get_protocols": celtypes.FunctionType,
+ "get_key_policy": celtypes.FunctionType,
+ "get_resource_policy": celtypes.FunctionType,
+ "describe_subscription_filters": celtypes.FunctionType,
+ "describe_db_snapshot_attributes": celtypes.FunctionType,
+ "arn_split": celtypes.FunctionType,
+ "all_images": celtypes.FunctionType,
+ "all_snapshots": celtypes.FunctionType,
+ "all_launch_configuration_names": celtypes.FunctionType,
+ "all_service_roles": celtypes.FunctionType,
+ "all_instance_profiles": celtypes.FunctionType,
+ "all_dbsubenet_groups": celtypes.FunctionType,
+ "all_scan_groups": celtypes.FunctionType,
+ "get_access_log": celtypes.FunctionType,
+ "get_load_balancer": celtypes.FunctionType,
+ "shield_protection": celtypes.FunctionType,
+ "shield_subscription": celtypes.FunctionType,
+ "web_acls": celtypes.FunctionType,
+ # "etc.": celtypes.FunctionType,
+}
+
+ExtFunction = Callable[..., celtypes.Value]
+
+FUNCTIONS: Dict[str, ExtFunction] = {
+ f.__name__: cast(ExtFunction, f) for f in [
+ glob,
+ difference,
+ intersect,
+ normalize,
+ parse_cidr,
+ size_parse_cidr,
+ unique_size,
+ version,
+ present,
+ absent,
+ text_from,
+ value_from,
+ jmes_path,
+ jmes_path_map,
+ key,
+ marked_key,
+ image,
+ get_metrics,
+ get_related_ids,
+ security_group,
+ subnet,
+ flow_logs,
+ vpc,
+ subst,
+ credentials,
+ kms_alias,
+ kms_key,
+ resource_schedule,
+ get_accounts,
+ get_related_sgs,
+ get_related_subnets,
+ get_related_nat_gateways,
+ get_related_igws,
+ get_related_security_configs,
+ get_related_vpc,
+ get_related_kms_keys,
+ get_vpcs,
+ get_vpces,
+ get_orgids,
+ get_endpoints,
+ get_protocols,
+ get_key_policy,
+ get_resource_policy,
+ describe_subscription_filters,
+ describe_db_snapshot_attributes,
+ arn_split,
+ all_images,
+ all_snapshots,
+ all_launch_configuration_names,
+ all_service_roles,
+ all_instance_profiles,
+ all_dbsubenet_groups,
+ all_scan_groups,
+ get_access_log,
+ get_load_balancer,
+ shield_protection,
+ shield_subscription,
+ web_acls,
+ # etc.
+ ]
+}
+
+
+class C7N_Interpreted_Runner(InterpretedRunner):
+ """
+ Extends the Evaluation to introduce the C7N CELFilter instance into the exvaluation.
+
+ The variable is global to allow the functions to have the simple-looking argument
+ values that CEL expects. This allows a function in this module to reach outside CEL for
+ access to C7N's caches.
+
+ .. todo: Refactor to be a mixin to the Runner class hierarchy.
+ """
+
+ def evaluate(self, context: Context, filter: Optional[Any] = None) -> celtypes.Value:
+ e = Evaluator(
+ ast=self.ast,
+ activation=self.new_activation(context),
+ functions=self.functions,
+ )
+ with C7NContext(filter=filter):
+ value = e.evaluate()
+ return value
diff --git a/.venv/lib/python3.12/site-packages/celpy/cel.lark b/.venv/lib/python3.12/site-packages/celpy/cel.lark
new file mode 100644
index 00000000..3b06bf87
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/cel.lark
@@ -0,0 +1,179 @@
+// SPDX-Copyright: Copyright (c) Capital One Services, LLC
+// SPDX-License-Identifier: Apache-2.0
+// Copyright 2020 Capital One Services, LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and limitations under the License.
+
+
+// From https://github.com/google/cel-spec
+// Modified in several ways:
+// - EBNF to Lark: Rules are all lower case, No terminating ; on rules, Regexes wrapped in //, Replace ::= with :.
+// - Strings expanded to include escapes and rewritten to be pure regex.
+// - FLOAT_LIT expanded
+// - Terminals reordered to reflect priorities better.
+
+// A number of rules used ! annotation to capture tokens.
+// These were rewritten to expand into specific, named rules and avoid a need for the Lark "!" construct.
+
+expr : conditionalor ["?" conditionalor ":" expr]
+
+conditionalor : [conditionalor "||"] conditionaland
+
+conditionaland : [conditionaland "&&"] relation
+
+// Original...
+// !relation : [relation relop] addition
+// !relop : "<" | "<=" | ">=" | ">" | "==" | "!=" | "in"
+
+// Expose operators in the AST.
+relation : [relation_lt | relation_le | relation_ge | relation_gt | relation_eq | relation_ne | relation_in] addition
+relation_lt : relation "<"
+relation_le : relation "<="
+relation_gt : relation ">"
+relation_ge : relation ">="
+relation_eq : relation "=="
+relation_ne : relation "!="
+relation_in : relation "in"
+
+// Original...
+// !addition : [addition ("+" | "-")] multiplication
+
+// Expose operators in the AST.
+addition : [addition_add | addition_sub] multiplication
+addition_add : addition "+"
+addition_sub : addition "-"
+
+// Original...
+// !multiplication: [multiplication ("*" | "/" | "%")] unary
+
+// Expose operators in the AST.
+multiplication : [multiplication_mul | multiplication_div | multiplication_mod] unary
+multiplication_mul : multiplication "*"
+multiplication_div : multiplication "/"
+multiplication_mod : multiplication "%"
+
+// Original...
+// !unary : member
+// | "!" "!"* member
+// | "-" "-"* member
+
+// Expose operators in the AST
+// Option 1: zero or more token nodes; requires some care to handle sequence of operations.
+//unary : [unary_not | unary_neg]* member
+
+// Option 2: separate expressions; doesn't maintain type safetly, allows ~!~!~!~!~x syntax which isn't really ideal.
+unary : member
+ | unary_not unary
+ | unary_neg unary
+
+unary_not : "!"
+unary_neg : "-"
+
+
+// Original...
+// !member : primary
+// | member "." IDENT ["(" [exprlist] ")"]
+// | member "[" expr "]"
+// | member "{" [fieldinits] "}"
+
+// Expose constructs in the AST.
+member : member_dot | member_dot_arg | member_index | member_object | primary
+member_dot : member "." IDENT
+member_dot_arg : member "." IDENT "(" [exprlist] ")"
+member_index : member "[" expr "]"
+member_object : member "{" [fieldinits] "}"
+
+
+// Original...
+// !primary : ["."] IDENT ["(" [exprlist] ")"]
+// | "(" expr ")"
+// | "[" [exprlist] "]"
+// | "{" [mapinits] "}"
+// | literal
+
+// Expose constructs in the AST.
+primary : literal | dot_ident_arg | dot_ident | ident_arg
+ | paren_expr | list_lit | map_lit | ident
+dot_ident_arg : "." IDENT "(" [exprlist] ")"
+dot_ident : "." IDENT
+ident_arg : IDENT "(" [exprlist] ")"
+ident : IDENT
+paren_expr : "(" expr ")"
+list_lit : "[" [exprlist] "]"
+map_lit : "{" [mapinits] "}"
+
+exprlist : expr ("," expr)*
+
+fieldinits : IDENT ":" expr ("," IDENT ":" expr)*
+
+mapinits : expr ":" expr ("," expr ":" expr)*
+
+// Elevated from Terminals to expose the type name in the AST.
+literal : UINT_LIT | FLOAT_LIT | INT_LIT | MLSTRING_LIT | STRING_LIT | BYTES_LIT
+ | BOOL_LIT | NULL_LIT
+
+// Terminals. Order of some definitions altered to help lark.
+
+// Signs added (see https://github.com/google/cel-spec/issues/126)
+INT_LIT : /-?/ /0x/ HEXDIGIT+ | /-?/ DIGIT+
+
+UINT_LIT : INT_LIT /[uU]/
+
+// Original...
+// FLOAT_LIT : DIGIT* . DIGIT+ EXPONENT? | DIGIT+ EXPONENT
+
+// Expanded and signs added (see https://github.com/google/cel-spec/issues/126)
+FLOAT_LIT : /-?/ DIGIT+ "." DIGIT* EXPONENT? | /-?/ DIGIT* "." DIGIT+ EXPONENT? | /-?/ DIGIT+ EXPONENT
+
+DIGIT : /[0-9]/
+
+HEXDIGIT : /[0-9abcdefABCDEF]/
+
+EXPONENT : /[eE]/ /[+-]?/ DIGIT+
+
+// Rewritten into REGEX; explicitly list the escapes
+
+STRING_LIT : /[rR]?"(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4-8}|.)*?"/
+ | /[rR]?'(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4}|\\U[0-9a-fA-F]{8}|.)*?'/
+
+MLSTRING_LIT : /[rR]?"""(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4-8}|\r\n|\r|\n|.)*?"""/
+ | /[rR]?'''(?:\\[abfnrtv"'\\]|\\\d{3}|\\x[0-9a-fA-F]{2}|\\u[0-9a-fA-F]{4}|\\U[0-9a-fA-F]{8}|\r\n|\r|\n|.)*?'''/
+
+BYTES_LIT : /[bB]/ MLSTRING_LIT | /[bB]/ STRING_LIT
+
+// Moved into STRING_LIT and MLSTRING_LIT, no longer needed.
+
+// ESCAPE : /\\[bfnrt"'\\]/
+// | /\\x/ HEXDIGIT HEXDIGIT
+// | /\\u/ HEXDIGIT HEXDIGIT HEXDIGIT HEXDIGIT
+// | /\\[0-3][0-7][0-7]/
+
+// NEWLINE : /\\r\\n/ | /\\r/ | /\\n/
+
+BOOL_LIT : "true" | "false"
+
+NULL_LIT : "null"
+
+IDENT : /[_a-zA-Z][_a-zA-Z0-9]*/
+
+RESERVED.0 : "as" | "break" | "const" | "continue" | "else"
+ | "for" | "function" | "if" | "import" | "let"
+ | "loop" | "package" | "namespace" | "return"
+ | "var" | "void" | "while"
+
+
+WHITESPACE : /[\t\n\f\r ]+/
+
+COMMENT : /\/\/.*/
+
+%ignore WHITESPACE
+%ignore COMMENT
diff --git a/.venv/lib/python3.12/site-packages/celpy/celparser.py b/.venv/lib/python3.12/site-packages/celpy/celparser.py
new file mode 100644
index 00000000..020621e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/celparser.py
@@ -0,0 +1,402 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+CEL Parser.
+
+See https://github.com/google/cel-spec/blob/master/doc/langdef.md
+
+https://github.com/google/cel-cpp/blob/master/parser/Cel.g4
+
+https://github.com/google/cel-go/blob/master/parser/gen/CEL.g4
+
+Builds a parser from the supplied cel.lark grammar.
+
+.. todo:: Consider embedding the ``cel.lark`` file as a triple-quoted literal.
+
+ This means fixing a LOT of \\'s. But it also eliminates a data file from the installation.
+
+Example::
+
+ >>> from celpy.celparser import CELParser
+ >>> p = CELParser()
+ >>> text2 = 'type(null)'
+ >>> ast2 = p.parse(text2)
+ >>> print(ast2.pretty().replace("\t"," ")) # doctest: +NORMALIZE_WHITESPACE
+ expr
+ conditionalor
+ conditionaland
+ relation
+ addition
+ multiplication
+ unary
+ member
+ primary
+ ident_arg
+ type
+ exprlist
+ expr
+ conditionalor
+ conditionaland
+ relation
+ addition
+ multiplication
+ unary
+ member
+ primary
+ literal null
+
+
+"""
+import re
+from pathlib import Path
+from typing import Any, List, Optional, cast
+
+import lark.visitors
+from lark import Lark, Token, Tree # noqa: F401
+from lark.exceptions import (LexError, ParseError, UnexpectedCharacters,
+ UnexpectedToken)
+
+
+class CELParseError(Exception):
+ def __init__(
+ self,
+ *args: Any,
+ line: Optional[int] = None,
+ column: Optional[int] = None) -> None:
+ super().__init__(*args)
+ self.line = line
+ self.column = column
+
+
+class CELParser:
+ """Wrapper for the CEL parser and the syntax error messages."""
+ CEL_PARSER: Optional[Lark] = None
+
+ def __init__(self) -> None:
+ if CELParser.CEL_PARSER is None:
+ CEL_grammar = (Path(__file__).parent / "cel.lark").read_text()
+ CELParser.CEL_PARSER = Lark(
+ CEL_grammar,
+ parser="lalr",
+ start="expr",
+ debug=True,
+ g_regex_flags=re.M,
+ lexer_callbacks={'IDENT': self.ambiguous_literals},
+ propagate_positions=True,
+ )
+
+ @staticmethod
+ def ambiguous_literals(t: Token) -> Token:
+ """Resolve a grammar ambiguity between identifiers and literals"""
+ if t.value == "true":
+ return Token("BOOL_LIT", t.value)
+ elif t.value == "false":
+ return Token("BOOL_LIT", t.value)
+ return t
+
+ def parse(self, text: str) -> Tree:
+ if CELParser.CEL_PARSER is None:
+ raise TypeError("No grammar loaded") # pragma: no cover
+ self.text = text
+ try:
+ return CELParser.CEL_PARSER.parse(self.text)
+ except (UnexpectedToken, UnexpectedCharacters) as ex:
+ message = ex.get_context(text)
+ raise CELParseError(message, *ex.args, line=ex.line, column=ex.column)
+ except (LexError, ParseError) as ex: # pragma: no cover
+ message = ex.args[0].splitlines()[0]
+ raise CELParseError(message, *ex.args)
+
+ def error_text(
+ self,
+ message: str,
+ line: Optional[int] = None,
+ column: Optional[int] = None) -> str:
+ source = self.text.splitlines()[line - 1] if line else self.text
+ message = (
+ f"ERROR: <input>:{line or '?'}:{column or '?'} {message}\n"
+ f" | {source}\n"
+ f" | {(column - 1) * '.' if column else ''}^\n"
+ )
+ return message
+
+
+class DumpAST(lark.visitors.Visitor_Recursive):
+ """Dump a CEL AST creating a close approximation to the original source."""
+
+ @classmethod
+ def display(cls_, ast: lark.Tree) -> str:
+ d = cls_()
+ d.visit(ast)
+ return d.stack[0]
+
+ def __init__(self) -> None:
+ self.stack: List[str] = []
+
+ def expr(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ cond = self.stack.pop()
+ self.stack.append(
+ f"{cond} ? {left} : {right}"
+ )
+
+ def conditionalor(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} || {right}"
+ )
+
+ def conditionaland(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} && {right}"
+ )
+
+ def relation(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} {right}"
+ )
+
+ def relation_lt(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} < ")
+
+ def relation_le(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} <= ")
+
+ def relation_gt(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} > ")
+
+ def relation_ge(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} >= ")
+
+ def relation_eq(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} == ")
+
+ def relation_ne(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} != ")
+
+ def relation_in(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} in ")
+
+ def addition(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} {right}"
+ )
+
+ def addition_add(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} + ")
+
+ def addition_sub(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} - ")
+
+ def multiplication(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} {right}"
+ )
+
+ def multiplication_mul(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} * ")
+
+ def multiplication_div(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} / ")
+
+ def multiplication_mod(self, tree: lark.Tree) -> None:
+ left = self.stack.pop()
+ self.stack.append(f"{left} % ")
+
+ def unary(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 1:
+ return
+ else:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(
+ f"{left} {right}"
+ )
+
+ def unary_not(self, tree: lark.Tree) -> None:
+ self.stack.append("!")
+
+ def unary_neg(self, tree: lark.Tree) -> None:
+ self.stack.append("-")
+
+ def member_dot(self, tree: lark.Tree) -> None:
+ right = cast(lark.Token, tree.children[1]).value
+ if self.stack:
+ left = self.stack.pop()
+ self.stack.append(f"{left}.{right}")
+
+ def member_dot_arg(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 3:
+ exprlist = self.stack.pop()
+ else:
+ exprlist = ""
+ right = cast(lark.Token, tree.children[1]).value
+ if self.stack:
+ left = self.stack.pop()
+ self.stack.append(f"{left}.{right}({exprlist})")
+ else:
+ self.stack.append(f".{right}({exprlist})")
+
+ def member_index(self, tree: lark.Tree) -> None:
+ right = self.stack.pop()
+ left = self.stack.pop()
+ self.stack.append(f"{left}[{right}]")
+
+ def member_object(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 2:
+ fieldinits = self.stack.pop()
+ else:
+ fieldinits = ""
+ left = self.stack.pop()
+ self.stack.append(f"{left}{{{fieldinits}}}")
+
+ def dot_ident_arg(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 2:
+ exprlist = self.stack.pop()
+ else:
+ exprlist = ""
+ left = cast(lark.Token, tree.children[0]).value
+ self.stack.append(f".{left}({exprlist})")
+
+ def dot_ident(self, tree: lark.Tree) -> None:
+ left = cast(lark.Token, tree.children[0]).value
+ self.stack.append(f".{left}")
+
+ def ident_arg(self, tree: lark.Tree) -> None:
+ if len(tree.children) == 2:
+ exprlist = self.stack.pop()
+ else:
+ exprlist = ""
+
+ left = cast(lark.Token, tree.children[0]).value
+ self.stack.append(f"{left}({exprlist})")
+
+ def ident(self, tree: lark.Tree) -> None:
+ self.stack.append(cast(lark.Token, tree.children[0]).value)
+
+ def paren_expr(self, tree: lark.Tree) -> None:
+ if self.stack:
+ left = self.stack.pop()
+ self.stack.append(f"({left})")
+
+ def list_lit(self, tree: lark.Tree) -> None:
+ if self.stack:
+ left = self.stack.pop()
+ self.stack.append(f"[{left}]")
+
+ def map_lit(self, tree: lark.Tree) -> None:
+ if self.stack:
+ left = self.stack.pop()
+ self.stack.append(f"{{{left}}}")
+ else:
+ self.stack.append("{}")
+
+ def exprlist(self, tree: lark.Tree) -> None:
+ items = ", ".join(reversed(list(self.stack.pop() for _ in tree.children)))
+ self.stack.append(items)
+
+ def fieldinits(self, tree: lark.Tree) -> None:
+ names = cast(List[lark.Token], tree.children[::2])
+ values = cast(List[lark.Token], tree.children[1::2])
+ assert len(names) == len(values)
+ pairs = reversed(list((n.value, self.stack.pop()) for n, v in zip(names, values)))
+ items = ", ".join(f"{n}: {v}" for n, v in pairs)
+ self.stack.append(items)
+
+ def mapinits(self, tree: lark.Tree) -> None:
+ """Note reversed pop order for values and keys."""
+ keys = tree.children[::2]
+ values = tree.children[1::2]
+ assert len(keys) == len(values)
+ pairs = reversed(list(
+ {'value': self.stack.pop(), 'key': self.stack.pop()}
+ for k, v in zip(keys, values)
+ ))
+ items = ", ".join(f"{k_v['key']}: {k_v['value']}" for k_v in pairs)
+ self.stack.append(items)
+
+ def literal(self, tree: lark.Tree) -> None:
+ if tree.children:
+ self.stack.append(cast(lark.Token, tree.children[0]).value)
+
+
+def tree_dump(ast: Tree) -> str:
+ """Dumps the AST to approximate the original source"""
+ d = DumpAST()
+ d.visit(ast)
+ return d.stack[0]
+
+
+if __name__ == "__main__": # pragma: no cover
+ # A minimal sanity check.
+ # This is a smoke test for the grammar to expose shift/reduce or reduce/reduce conflicts.
+ # It will produce a RuntimeWarning because it's not the proper main program.
+ p = CELParser()
+
+ text = """
+ account.balance >= transaction.withdrawal
+ || (account.overdraftProtection
+ && account.overdraftLimit >= transaction.withdrawal - account.balance)
+ """
+ ast = p.parse(text)
+ print(ast)
+
+ d = DumpAST()
+ d.visit(ast)
+ print(d.stack)
+
+ text2 = """type(null)"""
+ ast2 = p.parse(text2)
+ print(ast2.pretty())
diff --git a/.venv/lib/python3.12/site-packages/celpy/celtypes.py b/.venv/lib/python3.12/site-packages/celpy/celtypes.py
new file mode 100644
index 00000000..3c4513ac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/celtypes.py
@@ -0,0 +1,1495 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+CEL Types: wrappers on Python types to provide CEL semantics.
+
+This can be used by a Python module to work with CEL-friendly values and CEL results.
+
+Examples of distinctions between CEL and Python:
+
+- Unlike Python ``bool``, CEL :py:class:`BoolType` won't do some math.
+
+- CEL has ``int64`` and ``uint64`` subclasses of integer. These have specific ranges and
+ raise :exc:`ValueError` errors on overflow.
+
+CEL types will raise :exc:`ValueError` for out-of-range values and :exc:`TypeError`
+for operations they refuse.
+The :py:mod:`evaluation` module can capture these exceptions and turn them into result values.
+This can permit the logic operators to quietly silence them via "short-circuiting".
+
+In the normal course of events, CEL's evaluator may attempt operations between a
+CEL exception result and an instance of one of CEL types.
+We rely on this leading to an ordinary Python :exc:`TypeError` to be raised to propogate
+the error. Or. A logic operator may discard the error object.
+
+The :py:mod:`evaluation` module extends these types with it's own :exc:`CELEvalError` exception.
+We try to keep that as a separate concern from the core operator implementations here.
+We leverage Python features, which means raising exceptions when there is a problem.
+
+Types
+=============
+
+See https://github.com/google/cel-go/tree/master/common/types
+
+These are the Go type definitions that are used by CEL:
+
+- BoolType
+- BytesType
+- DoubleType
+- DurationType
+- IntType
+- ListType
+- MapType
+- NullType
+- StringType
+- TimestampType
+- TypeType
+- UintType
+
+The above types are handled directly byt CEL syntax.
+e.g., ``42`` vs. ``42u`` vs. ``"42"`` vs. ``b"42"`` vs. ``42.``.
+
+We provide matching Python class names for each of these types. The Python type names
+are subclasses of Python native types, allowing a client to transparently work with
+CEL results. A Python host should be able to provide values to CEL that will be tolerated.
+
+A type hint of ``Value`` unifies these into a common hint.
+
+The CEL Go implementation also supports protobuf types:
+
+- dpb.Duration
+- tpb.Timestamp
+- structpb.ListValue
+- structpb.NullValue
+- structpb.Struct
+- structpb.Value
+- wrapperspb.BoolValue
+- wrapperspb.BytesValue
+- wrapperspb.DoubleValue
+- wrapperspb.FloatValue
+- wrapperspb.Int32Value
+- wrapperspb.Int64Value
+- wrapperspb.StringValue
+- wrapperspb.UInt32Value
+- wrapperspb.UInt64Value
+
+These types involve expressions like the following::
+
+ google.protobuf.UInt32Value{value: 123u}
+
+In this case, the well-known protobuf name is directly visible as CEL syntax.
+There's a ``google`` package with the needed definitions.
+
+Type Provider
+==============================
+
+A type provider can be bound to the environment, this will support additional types.
+This appears to be a factory to map names of types to type classes.
+
+Run-time type binding is shown by a CEL expression like the following::
+
+ TestAllTypes{single_uint32_wrapper: 432u}
+
+The ``TestAllTypes`` is a protobuf type added to the CEL run-time. The syntax
+is defined by this syntax rule::
+
+ member_object : member "{" [fieldinits] "}"
+
+The ``member`` is part of a type provider library,
+either a standard protobuf definition or an extension. The field inits build
+values for the protobuf object.
+
+See https://github.com/google/cel-go/blob/master/test/proto3pb/test_all_types.proto
+for the ``TestAllTypes`` protobuf definition that is registered as a type provider.
+
+This expression will describes a Protobuf ``uint32`` object.
+
+Type Adapter
+=============
+
+So far, it appears that a type adapter wraps existing Go or C++ types
+with CEL-required methods. This seems like it does not need to be implemented
+in Python.
+
+Numeric Details
+===============
+
+Integer division truncates toward zero.
+
+The Go definition of modulus::
+
+ // Mod returns the floating-point remainder of x/y.
+ // The magnitude of the result is less than y and its
+ // sign agrees with that of x.
+
+https://golang.org/ref/spec#Arithmetic_operators
+
+"Go has the nice property that -a/b == -(a/b)."
+
+::
+
+ x y x / y x % y
+ 5 3 1 2
+ -5 3 -1 -2
+ 5 -3 -1 2
+ -5 -3 1 -2
+
+Python definition::
+
+ The modulo operator always yields a result
+ with the same sign as its second operand (or zero);
+ the absolute value of the result is strictly smaller than
+ the absolute value of the second operand.
+
+Here's the essential rule::
+
+ x//y * y + x%y == x
+
+However. Python ``//`` truncates toward negative infinity. Go ``/`` truncates toward zero.
+
+To get Go-like behavior, we need to use absolute values and restore the signs later.
+
+::
+
+ x_sign = -1 if x < 0 else +1
+ go_mod = x_sign * (abs(x) % abs(y))
+ return go_mod
+
+Timzone Details
+===============
+
+An implementation may have additional timezone names that must be injected into
+th dateutil.gettz() processing.
+
+For example, there may be the following sequence:
+
+1. A lowercase match for an alias or an existing dateutil timezone.
+
+2. A titlecase match for an existing dateutil timezone.
+
+3. The fallback, which is a +/-HH:MM string.
+
+.. TODO: Permit an extension into the timezone lookup.
+
+"""
+import datetime
+import logging
+import re
+from functools import reduce, wraps
+from math import fsum
+from typing import (Any, Callable, Dict, Iterable, List, Mapping, NoReturn,
+ Optional, Sequence, Tuple, Type, TypeVar, Union, cast,
+ overload)
+
+import dateutil.parser
+import dateutil.tz
+
+logger = logging.getLogger("celtypes")
+
+
+Value = Union[
+ 'BoolType',
+ 'BytesType',
+ 'DoubleType',
+ 'DurationType',
+ 'IntType',
+ 'ListType',
+ 'MapType',
+ None, # Used instead of NullType
+ 'StringType',
+ 'TimestampType',
+ 'UintType',
+]
+
+# The domain of types used to build Annotations.
+CELType = Union[
+ Type['BoolType'],
+ Type['BytesType'],
+ Type['DoubleType'],
+ Type['DurationType'],
+ Type['IntType'],
+ Type['ListType'],
+ Type['MapType'],
+ Callable[..., None], # Used instead of NullType
+ Type['StringType'],
+ Type['TimestampType'],
+ Type['TypeType'], # Used to mark Protobuf Type values
+ Type['UintType'],
+ Type['PackageType'],
+ Type['MessageType'],
+]
+
+
+def type_matched(method: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
+ """Decorates a method to assure the "other" value has the same type."""
+ @wraps(method)
+ def type_matching_method(self: Any, other: Any) -> Any:
+ if not(issubclass(type(other), type(self)) or issubclass(type(self), type(other))):
+ raise TypeError(f"no such overload: {self!r} {type(self)} != {other!r} {type(other)}")
+ return method(self, other)
+ return type_matching_method
+
+
+def logical_condition(e: Value, x: Value, y: Value) -> Value:
+ """
+ CEL e ? x : y operator.
+ Choose one of x or y. Exceptions in the unchosen expression are ignored.
+
+ Example::
+
+ 2 / 0 > 4 ? 'baz' : 'quux'
+
+ is a "division by zero" error.
+
+ ::
+
+ >>> logical_condition(
+ ... BoolType(True), StringType("this"), StringType("Not That"))
+ StringType('this')
+ >>> logical_condition(
+ ... BoolType(False), StringType("Not This"), StringType("that"))
+ StringType('that')
+ """
+ if not isinstance(e, BoolType):
+ raise TypeError(f"Unexpected {type(e)} ? {type(x)} : {type(y)}")
+ result = x if e else y
+ logger.debug("logical_condition(%r, %r, %r) = %r", e, x, y, result)
+ return result
+
+
+def logical_and(x: Value, y: Value) -> Value:
+ """
+ Native Python has a left-to-right rule.
+ CEL && is commutative with non-Boolean values, including error objects.
+ """
+ if not isinstance(x, BoolType) and not isinstance(y, BoolType):
+ raise TypeError(f"{type(x)} {x!r} and {type(y)} {y!r}")
+ elif not isinstance(x, BoolType) and isinstance(y, BoolType):
+ if y:
+ return x # whatever && true == whatever
+ else:
+ return y # whatever && false == false
+ elif isinstance(x, BoolType) and not isinstance(y, BoolType):
+ if x:
+ return y # true && whatever == whatever
+ else:
+ return x # false && whatever == false
+ else:
+ return BoolType(cast(BoolType, x) and cast(BoolType, y))
+
+
+def logical_not(x: Value) -> Value:
+ """
+ Native python `not` isn't fully exposed for CEL types.
+ """
+ if isinstance(x, BoolType):
+ result = BoolType(not x)
+ else:
+ raise TypeError(f"not {type(x)}")
+ logger.debug("logical_not(%r) = %r", x, result)
+ return result
+
+
+def logical_or(x: Value, y: Value) -> Value:
+ """
+ Native Python has a left-to-right rule: (True or y) is True, (False or y) is y.
+ CEL || is commutative with non-Boolean values, including errors.
+ ``(x || false)`` is ``x``, and ``(false || y)`` is ``y``.
+
+ Example 1::
+
+ false || 1/0 != 0
+
+ is a "no matching overload" error.
+
+ Example 2::
+
+ (2 / 0 > 3 ? false : true) || true
+
+ is a "True"
+
+ If the operand(s) are not BoolType, we'll create an TypeError that will become a CELEvalError.
+ """
+ if not isinstance(x, BoolType) and not isinstance(y, BoolType):
+ raise TypeError(f"{type(x)} {x!r} or {type(y)} {y!r}")
+ elif not isinstance(x, BoolType) and isinstance(y, BoolType):
+ if y:
+ return y # whatever || true == true
+ else:
+ return x # whatever || false == whatever
+ elif isinstance(x, BoolType) and not isinstance(y, BoolType):
+ if x:
+ return x # true || whatever == true
+ else:
+ return y # false || whatever == whatever
+ else:
+ return BoolType(cast(BoolType, x) or cast(BoolType, y))
+
+
+class BoolType(int):
+ """
+ Native Python permits unary operators on Booleans.
+
+ For CEL, We need to prevent -false from working.
+ """
+ def __new__(cls: Type['BoolType'], source: Any) -> 'BoolType':
+ if source is None:
+ return super().__new__(cls, 0)
+ elif isinstance(source, BoolType):
+ return source
+ elif isinstance(source, MessageType):
+ return super().__new__(
+ cls,
+ cast(int, source.get(StringType("value")))
+ )
+ else:
+ return super().__new__(cls, source)
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({bool(self)})"
+
+ def __str__(self) -> str:
+ return str(bool(self))
+
+ def __neg__(self) -> NoReturn:
+ raise TypeError("no such overload")
+
+ def __hash__(self) -> int:
+ return super().__hash__()
+
+
+class BytesType(bytes):
+ """Python's bytes semantics are close to CEL."""
+ def __new__(
+ cls: Type['BytesType'],
+ source: Union[str, bytes, Iterable[int], 'BytesType', 'StringType'],
+ *args: Any,
+ **kwargs: Any
+ ) -> 'BytesType':
+ if source is None:
+ return super().__new__(cls, b'')
+ elif isinstance(source, (bytes, BytesType)):
+ return super().__new__(cls, source)
+ elif isinstance(source, (str, StringType)):
+ return super().__new__(cls, source.encode('utf-8'))
+ elif isinstance(source, MessageType):
+ return super().__new__(
+ cls,
+ cast(bytes, source.get(StringType("value"))) # type: ignore [attr-defined]
+ )
+ elif isinstance(source, Iterable):
+ return super().__new__(
+ cls,
+ cast(Iterable[int], source)
+ )
+ else:
+ raise TypeError(f"Invalid initial value type: {type(source)}")
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+
+class DoubleType(float):
+ """
+ Native Python permits mixed type comparisons, doing conversions as needed.
+
+ For CEL, we need to prevent mixed-type comparisons from working.
+
+ TODO: Conversions from string? IntType? UintType? DoubleType?
+ """
+ def __new__(cls: Type['DoubleType'], source: Any) -> 'DoubleType':
+ if source is None:
+ return super().__new__(cls, 0)
+ elif isinstance(source, MessageType):
+ return super().__new__(
+ cls,
+ cast(float, source.get(StringType("value")))
+ )
+ else:
+ return super().__new__(cls, source)
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ def __str__(self) -> str:
+ text = str(float(self))
+ return text
+
+ def __neg__(self) -> 'DoubleType':
+ return DoubleType(super().__neg__())
+
+ def __mod__(self, other: Any) -> NoReturn:
+ raise TypeError(
+ f"found no matching overload for '_%_' applied to '(double, {type(other)})'")
+
+ def __truediv__(self, other: Any) -> 'DoubleType':
+ if cast(float, other) == 0.0:
+ return DoubleType("inf")
+ else:
+ return DoubleType(super().__truediv__(other))
+
+ def __rmod__(self, other: Any) -> NoReturn:
+ raise TypeError(
+ f"found no matching overload for '_%_' applied to '({type(other)}, double)'")
+
+ def __rtruediv__(self, other: Any) -> 'DoubleType':
+ if self == 0.0:
+ return DoubleType("inf")
+ else:
+ return DoubleType(super().__rtruediv__(other))
+
+ @type_matched
+ def __eq__(self, other: Any) -> bool:
+ return super().__eq__(other)
+
+ @type_matched
+ def __ne__(self, other: Any) -> bool:
+ return super().__ne__(other)
+
+ def __hash__(self) -> int:
+ return super().__hash__()
+
+
+IntOperator = TypeVar('IntOperator', bound=Callable[..., int])
+
+
+def int64(operator: IntOperator) -> IntOperator:
+ """Apply an operation, but assure the value is within the int64 range."""
+ @wraps(operator)
+ def clamped_operator(*args: Any, **kwargs: Any) -> int:
+ result: int = operator(*args, **kwargs)
+ if -(2**63) <= result < 2**63:
+ return result
+ raise ValueError("overflow")
+ return cast(IntOperator, clamped_operator)
+
+
+class IntType(int):
+ """
+ A version of int with overflow errors outside int64 range.
+
+ features/integer_math.feature:277 "int64_overflow_positive"
+
+ >>> IntType(9223372036854775807) + IntType(1)
+ Traceback (most recent call last):
+ ...
+ ValueError: overflow
+
+ >>> 2**63
+ 9223372036854775808
+
+ features/integer_math.feature:285 "int64_overflow_negative"
+
+ >>> -IntType(9223372036854775808) - IntType(1)
+ Traceback (most recent call last):
+ ...
+ ValueError: overflow
+
+ >>> IntType(DoubleType(1.9))
+ IntType(2)
+ >>> IntType(DoubleType(-123.456))
+ IntType(-123)
+ """
+ def __new__(
+ cls: Type['IntType'],
+ source: Any,
+ *args: Any,
+ **kwargs: Any
+ ) -> 'IntType':
+ convert: Callable[..., int]
+ if source is None:
+ return super().__new__(cls, 0)
+ elif isinstance(source, IntType):
+ return source
+ elif isinstance(source, MessageType):
+ # Used by protobuf.
+ return super().__new__(
+ cls,
+ cast(int, source.get(StringType("value")))
+ )
+ elif isinstance(source, (float, DoubleType)):
+ convert = int64(round)
+ elif isinstance(source, TimestampType):
+ convert = int64(lambda src: src.timestamp())
+ elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
+ convert = int64(lambda src: int(src[2:], 16))
+ elif isinstance(source, (str, StringType)) and source[:3] in {"-0x", "-0X"}:
+ convert = int64(lambda src: -int(src[3:], 16))
+ else:
+ # Must tolerate "-" as part of the literal.
+ # See https://github.com/google/cel-spec/issues/126
+ convert = int64(int)
+ return super().__new__(cls, convert(source))
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ def __str__(self) -> str:
+ text = str(int(self))
+ return text
+
+ @int64
+ def __neg__(self) -> 'IntType':
+ return IntType(super().__neg__())
+
+ @int64
+ def __add__(self, other: Any) -> 'IntType':
+ return IntType(super().__add__(cast(IntType, other)))
+
+ @int64
+ def __sub__(self, other: Any) -> 'IntType':
+ return IntType(super().__sub__(cast(IntType, other)))
+
+ @int64
+ def __mul__(self, other: Any) -> 'IntType':
+ return IntType(super().__mul__(cast(IntType, other)))
+
+ @int64
+ def __truediv__(self, other: Any) -> 'IntType':
+ other = cast(IntType, other)
+ self_sign = -1 if self < IntType(0) else +1
+ other_sign = -1 if other < IntType(0) else +1
+ go_div = self_sign * other_sign * (abs(self) // abs(other))
+ return IntType(go_div)
+
+ __floordiv__ = __truediv__
+
+ @int64
+ def __mod__(self, other: Any) -> 'IntType':
+ self_sign = -1 if self < IntType(0) else +1
+ go_mod = self_sign * (abs(self) % abs(cast(IntType, other)))
+ return IntType(go_mod)
+
+ @int64
+ def __radd__(self, other: Any) -> 'IntType':
+ return IntType(super().__radd__(cast(IntType, other)))
+
+ @int64
+ def __rsub__(self, other: Any) -> 'IntType':
+ return IntType(super().__rsub__(cast(IntType, other)))
+
+ @int64
+ def __rmul__(self, other: Any) -> 'IntType':
+ return IntType(super().__rmul__(cast(IntType, other)))
+
+ @int64
+ def __rtruediv__(self, other: Any) -> 'IntType':
+ other = cast(IntType, other)
+ self_sign = -1 if self < IntType(0) else +1
+ other_sign = -1 if other < IntType(0) else +1
+ go_div = self_sign * other_sign * (abs(other) // abs(self))
+ return IntType(go_div)
+
+ __rfloordiv__ = __rtruediv__
+
+ @int64
+ def __rmod__(self, other: Any) -> 'IntType':
+ left_sign = -1 if other < IntType(0) else +1
+ go_mod = left_sign * (abs(other) % abs(self))
+ return IntType(go_mod)
+
+ @type_matched
+ def __eq__(self, other: Any) -> bool:
+ return super().__eq__(other)
+
+ @type_matched
+ def __ne__(self, other: Any) -> bool:
+ return super().__ne__(other)
+
+ @type_matched
+ def __lt__(self, other: Any) -> bool:
+ return super().__lt__(other)
+
+ @type_matched
+ def __le__(self, other: Any) -> bool:
+ return super().__le__(other)
+
+ @type_matched
+ def __gt__(self, other: Any) -> bool:
+ return super().__gt__(other)
+
+ @type_matched
+ def __ge__(self, other: Any) -> bool:
+ return super().__ge__(other)
+
+ def __hash__(self) -> int:
+ return super().__hash__()
+
+
+def uint64(operator: IntOperator) -> IntOperator:
+ """Apply an operation, but assure the value is within the uint64 range."""
+ @wraps(operator)
+ def clamped_operator(*args: Any, **kwargs: Any) -> int:
+ result = operator(*args, **kwargs)
+ if 0 <= result < 2**64:
+ return result
+ raise ValueError("overflow")
+ return cast(IntOperator, clamped_operator)
+
+
+class UintType(int):
+ """
+ A version of int with overflow errors outside uint64 range.
+
+ Alternatives:
+
+ Option 1 - Use https://pypi.org/project/fixedint/
+
+ Option 2 - use array or struct modules to access an unsigned object.
+
+ Test Cases:
+
+ features/integer_math.feature:149 "unary_minus_no_overload"
+
+ >>> -UintType(42)
+ Traceback (most recent call last):
+ ...
+ TypeError: no such overload
+
+ uint64_overflow_positive
+
+ >>> UintType(18446744073709551615) + UintType(1)
+ Traceback (most recent call last):
+ ...
+ ValueError: overflow
+
+ uint64_overflow_negative
+
+ >>> UintType(0) - UintType(1)
+ Traceback (most recent call last):
+ ...
+ ValueError: overflow
+
+ >>> - UintType(5)
+ Traceback (most recent call last):
+ ...
+ TypeError: no such overload
+ """
+ def __new__(
+ cls: Type['UintType'],
+ source: Any,
+ *args: Any,
+ **kwargs: Any
+ ) -> 'UintType':
+ convert: Callable[..., int]
+ if isinstance(source, UintType):
+ return source
+ elif isinstance(source, (float, DoubleType)):
+ convert = uint64(round)
+ elif isinstance(source, TimestampType):
+ convert = uint64(lambda src: src.timestamp())
+ elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
+ convert = uint64(lambda src: int(src[2:], 16))
+ elif isinstance(source, MessageType):
+ # Used by protobuf.
+ convert = uint64(lambda src: src['value'] if src['value'] is not None else 0)
+ elif source is None:
+ convert = uint64(lambda src: 0)
+ else:
+ convert = uint64(int)
+ return super().__new__(cls, convert(source))
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ def __str__(self) -> str:
+ text = str(int(self))
+ return text
+
+ def __neg__(self) -> NoReturn:
+ raise TypeError("no such overload")
+
+ @uint64
+ def __add__(self, other: Any) -> 'UintType':
+ return UintType(super().__add__(cast(IntType, other)))
+
+ @uint64
+ def __sub__(self, other: Any) -> 'UintType':
+ return UintType(super().__sub__(cast(IntType, other)))
+
+ @uint64
+ def __mul__(self, other: Any) -> 'UintType':
+ return UintType(super().__mul__(cast(IntType, other)))
+
+ @uint64
+ def __truediv__(self, other: Any) -> 'UintType':
+ return UintType(super().__floordiv__(cast(IntType, other)))
+
+ __floordiv__ = __truediv__
+
+ @uint64
+ def __mod__(self, other: Any) -> 'UintType':
+ return UintType(super().__mod__(cast(IntType, other)))
+
+ @uint64
+ def __radd__(self, other: Any) -> 'UintType':
+ return UintType(super().__radd__(cast(IntType, other)))
+
+ @uint64
+ def __rsub__(self, other: Any) -> 'UintType':
+ return UintType(super().__rsub__(cast(IntType, other)))
+
+ @uint64
+ def __rmul__(self, other: Any) -> 'UintType':
+ return UintType(super().__rmul__(cast(IntType, other)))
+
+ @uint64
+ def __rtruediv__(self, other: Any) -> 'UintType':
+ return UintType(super().__rfloordiv__(cast(IntType, other)))
+
+ __rfloordiv__ = __rtruediv__
+
+ @uint64
+ def __rmod__(self, other: Any) -> 'UintType':
+ return UintType(super().__rmod__(cast(IntType, other)))
+
+ @type_matched
+ def __eq__(self, other: Any) -> bool:
+ return super().__eq__(other)
+
+ @type_matched
+ def __ne__(self, other: Any) -> bool:
+ return super().__ne__(other)
+
+ def __hash__(self) -> int:
+ return super().__hash__()
+
+
+class ListType(List[Value]):
+ """
+ Native Python implements comparison operations between list objects.
+
+ For CEL, we prevent list comparison operators from working.
+
+ We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
+ gracefully ignore type mismatch problems, calling them not equal.
+
+ See https://github.com/google/cel-spec/issues/127
+
+ An implied logical And means a singleton behaves in a distinct way from a non-singleton list.
+ """
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ def __lt__(self, other: Any) -> NoReturn:
+ raise TypeError("no such overload")
+
+ def __le__(self, other: Any) -> NoReturn:
+ raise TypeError("no such overload")
+
+ def __gt__(self, other: Any) -> NoReturn:
+ raise TypeError("no such overload")
+
+ def __ge__(self, other: Any) -> NoReturn:
+ raise TypeError("no such overload")
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, (list, ListType)):
+ raise TypeError(f"no such overload: ListType == {type(other)}")
+
+ def equal(s: Any, o: Any) -> Value:
+ try:
+ return BoolType(s == o)
+ except TypeError as ex:
+ return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
+
+ result = (
+ len(self) == len(other)
+ and reduce( # noqa: W503
+ logical_and, # type: ignore [arg-type]
+ (equal(item_s, item_o) for item_s, item_o in zip(self, other)),
+ BoolType(True) # type: ignore [arg-type]
+ )
+ )
+ if isinstance(result, TypeError):
+ raise result
+ return bool(result)
+
+ def __ne__(self, other: Any) -> bool:
+ if not isinstance(other, (list, ListType)):
+ raise TypeError(f"no such overload: ListType != {type(other)}")
+
+ def not_equal(s: Any, o: Any) -> Value:
+ try:
+ return BoolType(s != o)
+ except TypeError as ex:
+ return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
+
+ result = (
+ len(self) != len(other)
+ or reduce( # noqa: W503
+ logical_or, # type: ignore [arg-type]
+ (not_equal(item_s, item_o) for item_s, item_o in zip(self, other)),
+ BoolType(False) # type: ignore [arg-type]
+ )
+ )
+ if isinstance(result, TypeError):
+ raise result
+ return bool(result)
+
+
+BaseMapTypes = Union[
+ Mapping[Any, Any],
+ Sequence[Tuple[Any, Any]],
+ None
+]
+
+
+MapKeyTypes = Union[
+ 'IntType', 'UintType', 'BoolType', 'StringType', str
+]
+
+
+class MapType(Dict[Value, Value]):
+ """
+ Native Python allows mapping updates and any hashable type as a kay.
+
+ CEL prevents mapping updates and has a limited domain of key types.
+ int, uint, bool, or string keys
+
+ We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
+ gracefully ignore type mismatch problems for the values, calling them not equal.
+
+ See https://github.com/google/cel-spec/issues/127
+
+ An implied logical And means a singleton behaves in a distinct way from a non-singleton mapping.
+ """
+ def __init__(
+ self,
+ items: BaseMapTypes = None) -> None:
+ super().__init__()
+ if items is None:
+ pass
+ elif isinstance(items, Sequence):
+ for name, value in items:
+ self[name] = value
+ elif isinstance(items, Mapping):
+ for name, value in items.items():
+ self[name] = value
+ else:
+ raise TypeError(f"Invalid initial value type: {type(items)}")
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ def __getitem__(self, key: Any) -> Any:
+ if not MapType.valid_key_type(key):
+ raise TypeError(f"unsupported key type: {type(key)}")
+ return super().__getitem__(key)
+
+ def __eq__(self, other: Any) -> bool:
+ if not isinstance(other, (Mapping, MapType)):
+ raise TypeError(f"no such overload: MapType == {type(other)}")
+
+ def equal(s: Any, o: Any) -> BoolType:
+ try:
+ return BoolType(s == o)
+ except TypeError as ex:
+ return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
+
+ keys_s = self.keys()
+ keys_o = other.keys()
+ result = (
+ keys_s == keys_o
+ and reduce( # noqa: W503
+ logical_and, # type: ignore [arg-type]
+ (equal(self[k], other[k]) for k in keys_s),
+ BoolType(True) # type: ignore [arg-type]
+ )
+ )
+ if isinstance(result, TypeError):
+ raise result
+ return bool(result)
+
+ def __ne__(self, other: Any) -> bool:
+ if not isinstance(other, (Mapping, MapType)):
+ raise TypeError(f"no such overload: MapType != {type(other)}")
+
+ # Singleton special case, may return no-such overload.
+ if len(self) == 1 and len(other) == 1 and self.keys() == other.keys():
+ k = next(iter(self.keys()))
+ return cast(bool, self[k] != other[k]) # Instead of Union[BoolType, TypeError]
+
+ def not_equal(s: Any, o: Any) -> BoolType:
+ try:
+ return BoolType(s != o)
+ except TypeError as ex:
+ return cast(BoolType, ex) # Instead of Union[BoolType, TypeError]
+
+ keys_s = self.keys()
+ keys_o = other.keys()
+ result = (
+ keys_s != keys_o
+ or reduce( # noqa: W503
+ logical_or, # type: ignore [arg-type]
+ (not_equal(self[k], other[k]) for k in keys_s),
+ BoolType(False) # type: ignore [arg-type]
+ )
+ )
+ if isinstance(result, TypeError):
+ raise result
+ return bool(result)
+
+ @staticmethod
+ def valid_key_type(key: Any) -> bool:
+ """Valid CEL key types. Plus native str for tokens in the source when evaluating ``e.f``"""
+ return isinstance(key, (IntType, UintType, BoolType, StringType, str))
+
+
+class NullType:
+ """TBD. May not be needed. Python's None semantics appear to match CEL perfectly."""
+ pass
+
+
+class StringType(str):
+ """Python's str semantics are very, very close to CEL.
+
+ We rely on the overlap between ``"/u270c"`` and ``"/U0001f431"`` in CEL and Python.
+ """
+ def __new__(
+ cls: Type['StringType'],
+ source: Union[str, bytes, 'BytesType', 'StringType'],
+ *args: Any,
+ **kwargs: Any
+ ) -> 'StringType':
+ if isinstance(source, (bytes, BytesType)):
+ return super().__new__(cls, source.decode('utf'))
+ elif isinstance(source, (str, StringType)):
+ # TODO: Consider returning the original StringType object.
+ return super().__new__(cls, source)
+ else:
+ return cast(StringType, super().__new__(cls, source))
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({super().__repr__()})"
+
+ @type_matched
+ def __eq__(self, other: Any) -> bool:
+ return super().__eq__(other)
+
+ @type_matched
+ def __ne__(self, other: Any) -> bool:
+ return super().__ne__(other)
+
+ def __hash__(self) -> int:
+ return super().__hash__()
+
+
+class TimestampType(datetime.datetime):
+ """
+ Implements google.protobuf.Timestamp
+
+ See https://developers.google.com/protocol-buffers/docs/reference/google.protobuf
+
+ Also see https://www.ietf.org/rfc/rfc3339.txt.
+
+ The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
+
+ Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.datetime`.
+
+ From protobuf documentation for making a Timestamp in Python::
+
+ now = time.time()
+ seconds = int(now)
+ nanos = int((now - seconds) * 10**9)
+ timestamp = Timestamp(seconds=seconds, nanos=nanos)
+
+ Also::
+
+ >>> t = TimestampType("2009-02-13T23:31:30Z")
+ >>> repr(t)
+ "TimestampType('2009-02-13T23:31:30Z')"
+ >>> t.timestamp()
+ 1234567890.0
+ >>> str(t)
+ '2009-02-13T23:31:30Z'
+
+ :strong:`Timezones`
+
+ Timezones are expressed in the following grammar:
+
+ ::
+
+ TimeZone = "UTC" | LongTZ | FixedTZ ;
+ LongTZ = ? list available at
+ http://joda-time.sourceforge.net/timezones.html ? ;
+ FixedTZ = ( "+" | "-" ) Digit Digit ":" Digit Digit ;
+ Digit = "0" | "1" | ... | "9" ;
+
+ Fixed timezones are explicit hour and minute offsets from UTC.
+ Long timezone names are like Europe/Paris, CET, or US/Central.
+
+ The Joda project (https://www.joda.org/joda-time/timezones.html)
+ says "Time zone data is provided by the public IANA time zone database."
+
+ The ``dateutil`` project (https://pypi.org/project/python-dateutil/)
+ is used for TZ handling and timestamp parsing.
+
+ Additionally, there is a ``TZ_ALIASES`` mapping available in this class to permit additional
+ timezone names. By default, the mapping is empty, and the only names
+ available are those recognized by :mod:`dateutil.tz`.
+ """
+
+ TZ_ALIASES: Dict[str, str] = {}
+
+ def __new__(
+ cls: Type['TimestampType'],
+ source: Union[int, str, datetime.datetime],
+ *args: Any,
+ **kwargs: Any) -> 'TimestampType':
+
+ if isinstance(source, datetime.datetime):
+ # Wrap a datetime.datetime
+ return super().__new__(
+ cls,
+ year=source.year,
+ month=source.month,
+ day=source.day,
+ hour=source.hour,
+ minute=source.minute,
+ second=source.second,
+ microsecond=source.microsecond,
+ tzinfo=source.tzinfo or datetime.timezone.utc
+ )
+
+ elif isinstance(source, int) and len(args) >= 2:
+ # Wrap a sequence of integers that datetime.datetime might accept.
+ ts: TimestampType = super().__new__(
+ cls, source, *args, **kwargs
+ )
+ if not ts.tzinfo:
+ ts = ts.replace(tzinfo=datetime.timezone.utc)
+ return ts
+
+ elif isinstance(source, str):
+ # Use dateutil to try a variety of text formats.
+ parsed_datetime = dateutil.parser.isoparse(source)
+ return super().__new__(
+ cls,
+ year=parsed_datetime.year,
+ month=parsed_datetime.month,
+ day=parsed_datetime.day,
+ hour=parsed_datetime.hour,
+ minute=parsed_datetime.minute,
+ second=parsed_datetime.second,
+ microsecond=parsed_datetime.microsecond,
+ tzinfo=parsed_datetime.tzinfo
+ )
+
+ else:
+ raise TypeError(f"Cannot create {cls} from {source!r}")
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({str(self)!r})"
+
+ def __str__(self) -> str:
+ return self.strftime("%Y-%m-%dT%H:%M:%SZ")
+
+ def __add__(self, other: Any) -> 'TimestampType':
+ """Timestamp + Duration -> Timestamp"""
+ result = super().__add__(other)
+ if result == NotImplemented:
+ return NotImplemented
+ return TimestampType(result)
+
+ def __radd__(self, other: Any) -> 'TimestampType':
+ """Duration + Timestamp -> Timestamp"""
+ result = super().__radd__(other)
+ if result == NotImplemented:
+ return NotImplemented
+ return TimestampType(result)
+
+ # For more information, check the typeshed definition
+ # https://github.com/python/typeshed/blob/master/stdlib/2and3/datetime.pyi
+
+ @overload # type: ignore
+ def __sub__(self, other: 'TimestampType') -> 'DurationType':
+ ... # pragma: no cover
+
+ @overload
+ def __sub__(self, other: 'DurationType') -> 'TimestampType':
+ ... # pragma: no cover
+
+ def __sub__(
+ self,
+ other: Union['TimestampType', 'DurationType']
+ ) -> Union['TimestampType', 'DurationType']:
+ result = super().__sub__(other)
+ if result == NotImplemented:
+ return cast(DurationType, result)
+ if isinstance(result, datetime.timedelta):
+ return DurationType(result)
+ return TimestampType(result)
+
+ @classmethod
+ def tz_name_lookup(cls, tz_name: str) -> Optional[datetime.tzinfo]:
+ """
+ The :py:func:`dateutil.tz.gettz` may be extended with additional aliases.
+
+ .. TODO: Permit an extension into the timezone lookup.
+ Tweak ``celpy.celtypes.TimestampType.TZ_ALIASES``.
+ """
+ tz_lookup = str(tz_name)
+ if tz_lookup in cls.TZ_ALIASES:
+ tz = dateutil.tz.gettz(cls.TZ_ALIASES[tz_lookup])
+ else:
+ tz = dateutil.tz.gettz(tz_lookup)
+ return tz
+
+ @classmethod
+ def tz_offset_parse(cls, tz_name: str) -> Optional[datetime.tzinfo]:
+ tz_pat = re.compile(r"^([+-]?)(\d\d?):(\d\d)$")
+ tz_match = tz_pat.match(tz_name)
+ if not tz_match:
+ raise ValueError(f"Unparsable timezone: {tz_name!r}")
+ sign, hh, mm = tz_match.groups()
+ offset_min = (int(hh) * 60 + int(mm)) * (-1 if sign == '-' else +1)
+ offset = datetime.timedelta(seconds=offset_min * 60)
+ tz = datetime.timezone(offset)
+ return tz
+
+ @staticmethod
+ def tz_parse(tz_name: Optional[str]) -> Optional[datetime.tzinfo]:
+ if tz_name:
+ tz = TimestampType.tz_name_lookup(tz_name)
+ if tz is None:
+ tz = TimestampType.tz_offset_parse(tz_name)
+ return tz
+ else:
+ return dateutil.tz.UTC
+
+ def getDate(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).day)
+
+ def getDayOfMonth(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).day - 1)
+
+ def getDayOfWeek(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).isoweekday() % 7)
+
+ def getDayOfYear(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ working_date = self.astimezone(new_tz)
+ jan1 = datetime.datetime(working_date.year, 1, 1, tzinfo=new_tz)
+ days = working_date.toordinal() - jan1.toordinal()
+ return IntType(days)
+
+ def getMonth(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).month - 1)
+
+ def getFullYear(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).year)
+
+ def getHours(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).hour)
+
+ def getMilliseconds(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).microsecond // 1000)
+
+ def getMinutes(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).minute)
+
+ def getSeconds(self, tz_name: Optional[StringType] = None) -> IntType:
+ new_tz = self.tz_parse(tz_name)
+ return IntType(self.astimezone(new_tz).second)
+
+
+class DurationType(datetime.timedelta):
+ """
+ Implements google.protobuf.Duration
+
+ https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration
+
+ The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
+ Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.timedelta`.
+
+ The definition once said this::
+
+ "type conversion, duration should be end with "s", which stands for seconds"
+
+ This is obsolete, however, considering the following issue.
+
+ See https://github.com/google/cel-spec/issues/138
+
+ This refers to the following implementation detail
+ ::
+
+ // A duration string is a possibly signed sequence of
+ // decimal numbers, each with optional fraction and a unit suffix,
+ // such as "300ms", "-1.5h" or "2h45m".
+ // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+
+ The real regex, then is this::
+
+ [-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+
+
+ """
+ MaxSeconds = 315576000000
+ MinSeconds = -315576000000
+ NanosecondsPerSecond = 1000000000
+
+ scale: Dict[str, float] = {
+ "ns": 1E-9,
+ "us": 1E-6,
+ "µs": 1E-6,
+ "ms": 1E-3,
+ "s": 1.,
+ "m": 60.,
+ "h": 60. * 60.,
+ "d": 24. * 60. * 60.,
+ }
+
+ def __new__(
+ cls: Type['DurationType'],
+ seconds: Any,
+ nanos: int = 0,
+ **kwargs: Any) -> 'DurationType':
+ if isinstance(seconds, datetime.timedelta):
+ if not (cls.MinSeconds <= seconds.total_seconds() <= cls.MaxSeconds):
+ raise ValueError("range error: {seconds}")
+ return super().__new__(
+ cls, days=seconds.days, seconds=seconds.seconds, microseconds=seconds.microseconds)
+ elif isinstance(seconds, int):
+ if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
+ raise ValueError("range error: {seconds}")
+ return super().__new__(
+ cls, seconds=seconds, microseconds=nanos // 1000)
+ elif isinstance(seconds, str):
+ duration_pat = re.compile(r"^[-+]?([0-9]*(\.[0-9]*)?[a-z]+)+$")
+
+ duration_match = duration_pat.match(seconds)
+ if not duration_match:
+ raise ValueError(f"Invalid duration {seconds!r}")
+
+ # Consume the sign.
+ sign: float
+ if seconds.startswith("+"):
+ seconds = seconds[1:]
+ sign = +1
+ elif seconds.startswith("-"):
+ seconds = seconds[1:]
+ sign = -1
+ else:
+ sign = +1
+
+ # Sum the remaining time components: number * unit
+ try:
+ seconds = sign * fsum(
+ map(
+ lambda n_u: float(n_u.group(1)) * cls.scale[n_u.group(3)],
+ re.finditer(r"([0-9]*(\.[0-9]*)?)([a-z]+)", seconds)
+ )
+ )
+ except KeyError:
+ raise ValueError(f"Invalid duration {seconds!r}")
+
+ if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
+ raise ValueError("range error: {seconds}")
+ return super().__new__(
+ cls, seconds=seconds)
+ else:
+ raise TypeError(f"Invalid initial value type: {type(seconds)}")
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({str(self)!r})"
+
+ def __str__(self) -> str:
+ return "{0}s".format(int(self.total_seconds()))
+
+ def __add__(self, other: Any) -> 'DurationType':
+ """
+ This doesn't need to handle the rich variety of TimestampType overloadds.
+ This class only needs to handle results of duration + duration.
+ A duration + timestamp is not implemented by the timedelta superclass;
+ it is handled by the datetime superclass that implementes timestamp + duration.
+ """
+ result = super().__add__(other)
+ if result == NotImplemented:
+ return cast(DurationType, result)
+ # This is handled by TimestampType; this is here for completeness, but isn't used.
+ if isinstance(result, (datetime.datetime, TimestampType)):
+ return TimestampType(result) # pragma: no cover
+ return DurationType(result)
+
+ def __radd__(self, other: Any) -> 'DurationType': # pragma: no cover
+ """
+ This doesn't need to handle the rich variety of TimestampType overloadds.
+
+ Most cases are handled by TimeStamp.
+ """
+ result = super().__radd__(other)
+ if result == NotImplemented:
+ return cast(DurationType, result)
+ # This is handled by TimestampType; this is here for completeness, but isn't used.
+ if isinstance(result, (datetime.datetime, TimestampType)):
+ return TimestampType(result)
+ return DurationType(result)
+
+ def getHours(self, tz_name: Optional[str] = None) -> IntType:
+ assert tz_name is None
+ return IntType(int(self.total_seconds() / 60 / 60))
+
+ def getMilliseconds(self, tz_name: Optional[str] = None) -> IntType:
+ assert tz_name is None
+ return IntType(int(self.total_seconds() * 1000))
+
+ def getMinutes(self, tz_name: Optional[str] = None) -> IntType:
+ assert tz_name is None
+ return IntType(int(self.total_seconds() / 60))
+
+ def getSeconds(self, tz_name: Optional[str] = None) -> IntType:
+ assert tz_name is None
+ return IntType(int(self.total_seconds()))
+
+
+class FunctionType:
+ """
+ We need a concrete Annotation object to describe callables to celpy.
+ We need to describe functions as well as callable objects.
+ The description would tend to shadow ``typing.Callable``.
+
+ An ``__isinstance__()`` method, for example, may be helpful for run-time type-checking.
+
+ Superclass for CEL extension functions that are defined at run-time.
+ This permits a formal annotation in the environment construction that creates
+ an intended type for a given name.
+
+ This allows for some run-time type checking to see if the actual object binding
+ matches the declared type binding.
+
+ Also used to define protobuf classes provided as an annotation.
+
+ We *could* define this as three overloads to cover unary, binary, and tertiary cases.
+ """
+ def __call__(self, *args: Value, **kwargs: Value) -> Value:
+ raise NotImplementedError
+
+
+class PackageType(MapType):
+ """
+ A package of message types, usually protobuf.
+
+ TODO: This may not be needed.
+ """
+ pass
+
+
+class MessageType(MapType):
+ """
+ An individual protobuf message definition. A mapping from field name to field value.
+
+ See Scenario: "message_literal" in the parse.feature. This is a very deeply-nested
+ message (30? levels), but the navigation to "payload" field seems to create a default
+ value at the top level.
+ """
+ def __init__(self, *args: Value, **fields: Value) -> None:
+ if args and len(args) == 1:
+ super().__init__(cast(Mapping[Value, Value], args[0]))
+ elif args and len(args) > 1:
+ raise TypeError(r"Expected dictionary or fields, not {args!r}")
+ else:
+ super().__init__({StringType(k): v for k, v in fields.items()})
+
+ # def get(self, field: Any, default: Optional[Value] = None) -> Value:
+ # """
+ # Alternative implementation with descent to locate a deeply-buried field.
+ # It seemed like this was the defined behavior. It turns it, it isn't.
+ # The code is here in case we're wrong and it really is the defined behavior.
+ #
+ # Note. There is no default provision in CEL.
+ # """
+ # if field in self:
+ # return super().get(field)
+ #
+ # def descend(message: MessageType, field: Value) -> MessageType:
+ # if field in message:
+ # return message
+ # for k in message.keys():
+ # found = descend(message[k], field)
+ # if found is not None:
+ # return found
+ # return None
+ #
+ # sub_message = descend(self, field)
+ # if sub_message is None:
+ # return default
+ # return sub_message.get(field)
+
+
+class TypeType:
+ """
+ Annotation used to mark protobuf type objects.
+ We map these to CELTypes so that type name testing works.
+ """
+ type_name_mapping = {
+ "google.protobuf.Duration": DurationType,
+ "google.protobuf.Timestamp": TimestampType,
+ "google.protobuf.Int32Value": IntType,
+ "google.protobuf.Int64Value": IntType,
+ "google.protobuf.UInt32Value": UintType,
+ "google.protobuf.UInt64Value": UintType,
+ "google.protobuf.FloatValue": DoubleType,
+ "google.protobuf.DoubleValue": DoubleType,
+ "google.protobuf.Value": MessageType,
+ "google.protubuf.Any": MessageType, # Weird.
+ "google.protobuf.Any": MessageType,
+ "list_type": ListType,
+ "map_type": MapType,
+ "map": MapType,
+ "list": ListType,
+ "string": StringType,
+ "bytes": BytesType,
+ "bool": BoolType,
+ "int": IntType,
+ "uint": UintType,
+ "double": DoubleType,
+ "null_type": type(None),
+ "STRING": StringType,
+ "BOOL": BoolType,
+ "INT64": IntType,
+ "UINT64": UintType,
+ "INT32": IntType,
+ "UINT32": UintType,
+ "BYTES": BytesType,
+ "DOUBLE": DoubleType,
+ }
+
+ def __init__(
+ self,
+ value: Any = "") -> None:
+ if isinstance(value, str) and value in self.type_name_mapping:
+ self.type_reference = self.type_name_mapping[value]
+ elif isinstance(value, str):
+ try:
+ self.type_reference = eval(value)
+ except (NameError, SyntaxError):
+ raise TypeError(f"Unknown type {value!r}")
+ else:
+ self.type_reference = value.__class__
+
+ def __eq__(self, other: Any) -> bool:
+ return (
+ other == self.type_reference
+ or isinstance(other, self.type_reference) # noqa: W503
+ )
diff --git a/.venv/lib/python3.12/site-packages/celpy/evaluation.py b/.venv/lib/python3.12/site-packages/celpy/evaluation.py
new file mode 100644
index 00000000..5e97e3b3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/evaluation.py
@@ -0,0 +1,2446 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+CEL Interpreter using the AST directly.
+
+The general idea is to map CEL operators to Python operators and push the
+real work off to Python objects defined by the :py:mod:`celpy.celtypes` module.
+
+CEL operator "+" is implemented by "_+_" function. We map this to :py:func:`operator.add`.
+This will then look for `__add__()` methods in the various :py:class:`celpy.celtypes.CELType`
+types.
+
+In order to deal gracefully with missing and incomplete data,
+exceptions are turned into first-class :py:class:`Result` objects.
+They're not raised directly, but instead saved as part of the evaluation so that
+short-circuit operators can ignore the exceptions.
+
+This means that Python exceptions like :exc:`TypeError`, :exc:`IndexError`, and :exc:`KeyError`
+are caught and transformed into :exc:`CELEvalError` objects.
+
+The :py:class:`Resut` type hint is a union of the various values that are encountered
+during evaluation. It's a union of the :py:class:`celpy.celtypes.CELTypes` type and the
+:exc:`CELEvalError` exception.
+"""
+import collections
+import logging
+import operator
+import re
+import sys
+from functools import reduce, wraps
+from typing import (Any, Callable, Dict, Iterable, Iterator, List, Mapping,
+ Match, Optional, Sequence, Sized, Tuple, Type, TypeVar,
+ Union, cast)
+
+import lark
+import lark.visitors
+
+import celpy.celtypes
+from celpy.celparser import tree_dump
+
+# A CEL type annotation. Used in an environment to describe objects as well as functions.
+# This is a list of types, plus Callable for conversion functions.
+Annotation = Union[
+ celpy.celtypes.CELType,
+ Callable[..., celpy.celtypes.Value], # Conversion functions and protobuf message type
+ Type[celpy.celtypes.FunctionType], # Concrete class for annotations
+]
+
+
+logger = logging.getLogger("evaluation")
+
+
+class CELSyntaxError(Exception):
+ """CEL Syntax error -- the AST did not have the expected structure."""
+ def __init__(self, arg: Any, line: Optional[int] = None, column: Optional[int] = None) -> None:
+ super().__init__(arg)
+ self.line = line
+ self.column = column
+
+
+class CELUnsupportedError(Exception):
+ """Feature unsupported by this implementation of CEL."""
+ def __init__(self, arg: Any, line: int, column: int) -> None:
+ super().__init__(arg)
+ self.line = line
+ self.column = column
+
+
+class CELEvalError(Exception):
+ """CEL evaluation problem. This can be saved as a temporary value for later use.
+ This is politely ignored by logic operators to provide commutative short-circuit.
+
+ We provide operator-like special methods so an instance of an error
+ returns itself when operated on.
+ """
+ def __init__(
+ self,
+ *args: Any,
+ tree: Optional[lark.Tree] = None,
+ token: Optional[lark.Token] = None) -> None:
+ super().__init__(*args)
+ self.tree = tree
+ self.token = token
+ self.line: Optional[int] = None
+ self.column: Optional[int] = None
+ if self.tree:
+ self.line = self.tree.meta.line
+ self.column = self.tree.meta.column
+ if self.token:
+ self.line = self.token.line
+ self.column = self.token.column
+
+ def __repr__(self) -> str:
+ cls = self.__class__.__name__
+ if self.tree and self.token:
+ # This is rare
+ return (
+ f"{cls}(*{self.args}, tree={tree_dump(self.tree)!r}, token={self.token!r})"
+ ) # pragma: no cover
+ elif self.tree:
+ return f"{cls}(*{self.args}, tree={tree_dump(self.tree)!r})" # pragma: no cover
+ else:
+ # Some unit tests do not provide a mock tree.
+ return f"{cls}(*{self.args})" # pragma: no cover
+
+ def with_traceback(self, tb: Any) -> 'CELEvalError':
+ return super().with_traceback(tb)
+
+ def __neg__(self) -> 'CELEvalError':
+ return self
+
+ def __add__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __sub__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __mul__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __truediv__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __floordiv__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __mod__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __pow__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __radd__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rsub__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rmul__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rtruediv__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rfloordiv__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rmod__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __rpow__(self, other: Any) -> 'CELEvalError':
+ return self
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, CELEvalError):
+ return self.args == other.args
+ return NotImplemented
+
+ def __call__(self, *args: Any) -> 'CELEvalError':
+ return self
+
+
+# The interim results extends celtypes to include itermediate CELEvalError exception objects.
+# These can be deferred as part of commutative logical_and and logical_or operations.
+# It includes the responses to type() queries, also.
+Result = Union[
+ celpy.celtypes.Value,
+ CELEvalError,
+ celpy.celtypes.CELType,
+]
+
+# The various functions that apply to CEL data.
+# The evaluator's functions expand on the CELTypes to include CELEvalError and the
+# celpy.celtypes.CELType union type, also.
+CELFunction = Callable[..., Result]
+
+# A combination of a CELType result or a function resulting from identifier evaluation.
+Result_Function = Union[
+ Result,
+ CELFunction,
+]
+
+Exception_Filter = Union[Type[BaseException], Sequence[Type[BaseException]]]
+
+TargetFunc = TypeVar('TargetFunc', bound=CELFunction)
+
+
+def eval_error(new_text: str, exc_class: Exception_Filter) -> Callable[[TargetFunc], TargetFunc]:
+ """
+ Wrap a function to transform native Python exceptions to CEL CELEvalError values.
+ Any exception of the given class is replaced with the new CELEvalError object.
+
+ :param new_text: Text of the exception, e.g., "divide by zero", "no such overload")
+ this is the return value if the :exc:`CELEvalError` becomes the result.
+ :param exc_class: A Python exception class to match, e.g. ZeroDivisionError,
+ or a sequence of exception classes (e.g. (ZeroDivisionError, ValueError))
+ :return: A decorator that can be applied to a function
+ to map Python exceptions to :exc:`CELEvalError` instances.
+
+ This is used in the ``all()`` and ``exists()`` macros to silently ignore TypeError exceptions.
+ """
+ def concrete_decorator(function: TargetFunc) -> TargetFunc:
+ @wraps(function)
+ def new_function(*args: celpy.celtypes.Value, **kwargs: celpy.celtypes.Value) -> Result:
+ try:
+ return function(*args, **kwargs)
+ except exc_class as ex: # type: ignore[misc]
+ logger.debug("%s(*%s, **%s) --> %s", function.__name__, args, kwargs, ex)
+ _, _, tb = sys.exc_info()
+ value = CELEvalError(new_text, ex.__class__, ex.args).with_traceback(tb)
+ value.__cause__ = ex
+ return value
+ except Exception:
+ logger.error("%s(*%s, **%s)", function.__name__, args, kwargs)
+ raise
+ return cast(TargetFunc, new_function)
+ return concrete_decorator
+
+
+def boolean(
+ function: Callable[..., celpy.celtypes.Value]) -> Callable[..., celpy.celtypes.BoolType]:
+ """
+ Wraps boolean operators to create CEL BoolType results.
+
+ :param function: One of the operator.lt, operator.gt, etc. comparison functions
+ :return: Decorated function with type coercion.
+ """
+ @wraps(function)
+ def bool_function(a: celpy.celtypes.Value, b: celpy.celtypes.Value) -> celpy.celtypes.BoolType:
+ result = function(a, b)
+ if result == NotImplemented:
+ return cast(celpy.celtypes.BoolType, result)
+ return celpy.celtypes.BoolType(bool(result))
+ return bool_function
+
+
+def operator_in(item: Result, container: Result) -> Result:
+ """
+ CEL contains test; ignores type errors.
+
+ During evaluation of ``'elem' in [1, 'elem', 2]``,
+ CEL will raise internal exceptions for ``'elem' == 1`` and ``'elem' == 2``.
+ The :exc:`TypeError` exceptions are gracefully ignored.
+
+ During evaluation of ``'elem' in [1u, 'str', 2, b'bytes']``, however,
+ CEL will raise internal exceptions every step of the way, and an exception
+ value is the final result. (Not ``False`` from the one non-exceptional comparison.)
+
+ It would be nice to make use of the following::
+
+ eq_test = eval_error("no such overload", TypeError)(lambda x, y: x == y)
+
+ It seems like ``next(iter(filter(lambda x: eq_test(c, x) for c in container))))``
+ would do it. But. It's not quite right for the job.
+
+ There need to be three results, something :py:func:`filter` doesn't handle.
+ These are the chocies:
+
+ - True. There was a item found. Exceptions may or may not have been found.
+ - False. No item found AND no expceptions.
+ - CELEvalError. No item found AND at least one exception.
+
+ To an extent this is a little like the ``exists()`` macro.
+ We can think of ``container.contains(item)`` as ``container.exists(r, r == item)``.
+ However, exists() tends to silence exceptions, where this can expost them.
+
+ .. todo:: This may be better done as
+
+ ``reduce(logical_or, (item == c for c in container), BoolType(False))``
+ """
+ result: Result = celpy.celtypes.BoolType(False)
+ for c in cast(Iterable[Result], container):
+ try:
+ if c == item:
+ return celpy.celtypes.BoolType(True)
+ except TypeError as ex:
+ logger.debug("operator_in(%s, %s) --> %s", item, container, ex)
+ result = CELEvalError("no such overload", ex.__class__, ex.args)
+ logger.debug("operator_in(%r, %r) = %r", item, container, result)
+ return result
+
+
+def function_size(container: Result) -> Result:
+ """
+ The size() function applied to a Value. Delegate to Python's :py:func:`len`.
+
+ (string) -> int string length
+ (bytes) -> int bytes length
+ (list(A)) -> int list size
+ (map(A, B)) -> int map size
+
+ For other types, this will raise a Python :exc:`TypeError`.
+ (This is captured and becomes an :exc:`CELEvalError` Result.)
+
+ .. todo:: check container type for celpy.celtypes.StringType, celpy.celtypes.BytesType,
+ celpy.celtypes.ListType and celpy.celtypes.MapType
+ """
+ if container is None:
+ return celpy.celtypes.IntType(0)
+ sized_container = cast(Sized, container)
+ result = celpy.celtypes.IntType(len(sized_container))
+ logger.debug("function_size(%r) = %r", container, result)
+ return result
+
+
+# User-defined functions can override items in this mapping.
+base_functions: Mapping[str, CELFunction] = {
+ "!_": celpy.celtypes.logical_not,
+ "-_": operator.neg,
+ "_+_": operator.add,
+ "_-_": operator.sub,
+ "_*_": operator.mul,
+ "_/_": operator.truediv,
+ "_%_": operator.mod,
+ "_<_": boolean(operator.lt),
+ "_<=_": boolean(operator.le),
+ "_>=_": boolean(operator.ge),
+ "_>_": boolean(operator.gt),
+ "_==_": boolean(operator.eq),
+ "_!=_": boolean(operator.ne),
+ "_in_": operator_in,
+ "_||_": celpy.celtypes.logical_or,
+ "_&&_": celpy.celtypes.logical_and,
+ "_?_:_": celpy.celtypes.logical_condition,
+ "_[_]": operator.getitem,
+ "size": function_size,
+ # StringType methods
+ "endsWith": lambda s, text: celpy.celtypes.BoolType(s.endswith(text)),
+ "startsWith": lambda s, text: celpy.celtypes.BoolType(s.startswith(text)),
+ "matches": lambda s, pattern: celpy.celtypes.BoolType(re.search(pattern, s) is not None),
+ "contains": lambda s, text: celpy.celtypes.BoolType(text in s),
+ # TimestampType methods. Type details are redundant, but required because of the lambdas
+ "getDate": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDate(tz_name)),
+ "getDayOfMonth": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfMonth(tz_name)),
+ "getDayOfWeek": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfWeek(tz_name)),
+ "getDayOfYear": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getDayOfYear(tz_name)),
+ "getFullYear": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getFullYear(tz_name)),
+ "getMonth": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMonth(tz_name)),
+ # TimestampType and DurationType methods
+ "getHours": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getHours(tz_name)),
+ "getMilliseconds": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMilliseconds(tz_name)),
+ "getMinutes": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getMinutes(tz_name)),
+ "getSeconds": lambda ts, tz_name=None: celpy.celtypes.IntType(ts.getSeconds(tz_name)),
+ # type conversion functions
+ "bool": celpy.celtypes.BoolType,
+ "bytes": celpy.celtypes.BytesType,
+ "double": celpy.celtypes.DoubleType,
+ "duration": celpy.celtypes.DurationType,
+ "int": celpy.celtypes.IntType,
+ "list": celpy.celtypes.ListType, # https://github.com/google/cel-spec/issues/123
+ "map": celpy.celtypes.MapType,
+ "null_type": type(None),
+ "string": celpy.celtypes.StringType,
+ "timestamp": celpy.celtypes.TimestampType,
+ "uint": celpy.celtypes.UintType,
+ "type": type,
+}
+
+
+class Referent:
+ """
+ A Name can refer to any of the following things:
+
+ - Annotations -- initially most names are these
+ or a CELFunction that may implement a type.
+ Must be provided as part of the initialization.
+
+ - NameContainer -- some names are these. This is true
+ when the name is *not* provided as part of the initialization because
+ we discovered the name during type or environment binding.
+
+ - celpy.celtypes.Value -- many annotations also have values.
+ These are provided **after** Annotations, and require them.
+
+ - CELEvalError -- This seems unlikely, but we include it because it's possible.
+
+ - Functions -- All of the type conversion functions are names in a NameContainer.
+
+ A name can be ambiguous and refer to both a nested ``NameContainer`` as well
+ as a ``celpy.celtypes.Value`` (usually a MapType instance.)
+
+ Object ``b`` has two possible meanings:
+
+ - ``b.c`` is a NameContainer for ``c``, a string.
+
+ - ``b`` is a mapping, and ``b.c`` is syntax sugar for ``b['c']``.
+
+ The "longest name" rule means that the useful value is the "c" object
+ in the nested ``NameContainer``.
+ The syntax sugar interpretation is done in the rare case we can't find the ``NameContainer``.
+
+ >>> nc = NameContainer("c", celpy.celtypes.StringType)
+ >>> b = Referent(celpy.celtypes.MapType)
+ >>> b.value = celpy.celtypes.MapType({"c": "oops"})
+ >>> b.value == celpy.celtypes.MapType({"c": "oops"})
+ True
+ >>> b.container = nc
+ >>> b.value == nc
+ True
+
+ In effect, this class is
+ ::
+
+ Referent = Union[
+ Annotation,
+ celpy.celtypes.Value,
+ CELEvalError,
+ CELFunction,
+ ]
+ """
+ def __init__(
+ self,
+ ref_to: Optional[Annotation] = None
+ # Union[
+ # None, Annotation, celpy.celtypes.Value, CELEvalError,
+ # CELFunction, 'NameContainer'
+ # ] = None
+ ) -> None:
+ self.annotation: Optional[Annotation] = None
+ self.container: Optional['NameContainer'] = None
+ self._value: Union[
+ None, Annotation, celpy.celtypes.Value, CELEvalError, CELFunction,
+ 'NameContainer'] = None
+ self._value_set = False
+ if ref_to:
+ self.annotation = ref_to
+
+ def __repr__(self) -> str:
+ return (
+ f"{self.__class__.__name__}(annotation={self.annotation!r}, "
+ f"container={self.container!r}, "
+ f"_value={self._value!r})"
+ )
+
+ @property
+ def value(self) -> Union[
+ Annotation, celpy.celtypes.Value, CELEvalError, CELFunction, 'NameContainer']:
+ """
+ The longest-path rule means we prefer ``NameContainer`` over any locally defined value.
+ Otherwise, we'll provide a value if there is one.
+ Finally, we'll provide the annotation if there's no value.
+ :return:
+ """
+ if self.container is not None:
+ return self.container
+ elif self._value_set:
+ return self._value
+ else:
+ # Not part of a namespace path. Nor was a value set.
+ return self.annotation
+
+ @value.setter
+ def value(
+ self,
+ ref_to: Union[
+ Annotation, celpy.celtypes.Value, CELEvalError, CELFunction, 'NameContainer']
+ ) -> None:
+ self._value = ref_to
+ self._value_set = True
+
+ def clone(self) -> "Referent":
+ new = Referent(self.annotation)
+ new.container = self.container
+ new._value = self._value
+ new._value_set = self._value_set
+ return new
+
+
+# A name resolution context is a mapping from an identifer to a Value or a ``NameContainer``.
+# This reflects some murkiness in the name resolution algorithm that needs to be cleaned up.
+Context = Mapping[str, Union[Result, "NameContainer"]]
+
+
+# Copied from cel.lark
+IDENT = r"[_a-zA-Z][_a-zA-Z0-9]*"
+
+
+class NameContainer(Dict[str, Referent]):
+ """
+ A namespace that fulfills the CEL name resolution requirement.
+
+ ::
+
+ Scenario: "qualified_identifier_resolution_unchecked"
+ "namespace resolution should try to find the longest prefix for the evaluator."
+
+ NameContainer instances can be chained (via parent) to create a sequence of searchable
+ locations for a name.
+
+ - Local-most is an Activation with local variables within a macro.
+ These are part of a nested chain of Activations for each macro. Each local activation
+ is a child with a reference to the parent Activation.
+
+ - Parent of any local Activation is the overall Activation for this CEL evaluation.
+ The overall Activation contains a number of NameContainers:
+
+ - The global variable bindings.
+
+ - Bindings of function definitions. This is the default set of functions for CEL
+ plus any add-on functions introduced by C7N.
+
+ - The run-time annotations from the environment. There are two kinds:
+
+ - Protobuf message definitions. These are types, really.
+
+ - Annotations for global variables. The annotations tend to be hidden by the values.
+ They're in the lookup chain to simplify access to protobuf messages.
+
+ - The environment also provides the built-in type names and aliases for the
+ :mod:`celtypes` package of built-in types.
+
+ This means name resolution marches from local-most to remote-most, searching for a binding.
+ The global variable bindings have a local-most value and a more remote annotation.
+ The annotations (i.e. protobuf message types) have only a fairly remote annotation without
+ a value.
+
+ Structure.
+
+ A NameContainer is a mapping from names to Referents.
+
+ A Referent can be one of three things.
+
+ - A NameContainer further down the path
+ - An Annotation
+ - An Annotation with a value.
+
+ Loading Names.
+
+ There are several "phases" to building the chain of ``NameContainer`` instances.
+
+ 1. The ``Activation`` creates the initial ``name : annotation`` bindings.
+ Generally, the names are type names, like "int", bound to :py:class:`celtypes.IntType`.
+ In some cases, the name is a future variable name, "resource",
+ bound to :py:class:`celtypes.MapType`.
+
+ 2. The ``Activation`` creates a second ``NameContainer`` that has variable names.
+ This has a reference back to the parent to resolve names that are types.
+
+ This involves decomposing the paths of names to make a tree of nested ``NameContainers``.
+ Upper-level containers don't (necessarily) have types or values -- they're merely
+ ``NameContainer`` along the path to the target names.
+
+ Resolving Names.
+
+ See https://github.com/google/cel-spec/blob/master/doc/langdef.md#name-resolution
+
+ There are three cases required in the :py:class:`Evaluator` engine.
+
+ - Variables and Functions. These are ``Result_Function`` instances: i.e., ordinary values.
+
+ - ``Name.Name`` can be navigation into a protobuf package, when ``Name`` is protobuf package.
+ The idea is to locate the longest possible match.
+
+ If a.b is a name to be resolved in the context of a protobuf declaration with scope A.B,
+ then resolution is attempted, in order, as A.B.a.b, A.a.b, and finally a.b.
+ To override this behavior, one can use .a.b;
+ this name will only be attempted to be resolved in the root scope, i.e. as a.b.
+
+ - ``Name.Name`` can be syntactic sugar for indexing into a mapping when ``Name`` is a value of
+ ``MapType`` or a ``MessageType``. It's evaluated as if it was ``Name["Name"]``.
+ This is a fall-back plan if the previous resolution failed.
+
+ The longest chain of nested packages *should* be resolved first.
+ This will happen when each name is a ``NameContainer`` object containing
+ other ``NameContainer`` objects.
+
+ The chain of evaluations for ``IDENT . IDENT . IDENT`` is (in effect)
+ ::
+
+ member_dot(member_dot(primary(IDENT), IDENT), IDENT)
+
+ This makes the ``member_dot` processing left associative.
+
+ The ``primary(IDENT)`` resolves to a CEL object of some kind.
+ Once the ``primary(IDENT)`` has been resolved, it establishes a context
+ for subsequent ``member_dot`` methods.
+
+ - If this is a ``MapType`` or a ``MessageType`` with an object,
+ then ``member_dot`` will pluck out a field value and return this.
+
+ - If this is a ``NameContainer`` or a ``PackageType`` then the ``member_dot``
+ will pluck out a sub-package or ``EnumType`` or ``MessageType``
+ and return the type object instead of a value.
+ At some point a ``member_object`` production will build an object from the type.
+
+ The evaluator's :meth:`ident_value` method resolves the identifier into the ``Referent``.
+
+ Acceptance Test Case
+
+ We have two names
+
+ - `a.b` -> NameContainer in which c = "yeah". (i.e., a.b.c : "yeah")
+ - `a.b` -> Mapping with {"c": "oops"}.
+
+ This means any given name can have as many as three meanings:
+
+ - Primarily as a NameContainer. This resolves name.name.name to find the longest
+ namespace possible.
+
+ - Secondarily as a Mapping. This will be a fallback when name.name.name is really
+ syntactic sugar for name.name['name'].
+
+ - Finally as a type annotation.
+
+ """
+ ident_pat = re.compile(IDENT)
+ extended_name_path = re.compile(f"^\\.?{IDENT}(?:\\.{IDENT})*$")
+ logger = logging.getLogger("NameContainer")
+
+ def __init__(
+ self,
+ name: Optional[str] = None,
+ ref_to: Optional[Referent] = None,
+ parent: Optional['NameContainer'] = None
+ ) -> None:
+ if name and ref_to:
+ super().__init__({name: ref_to})
+ else:
+ super().__init__()
+ self.parent: Optional[NameContainer] = parent
+
+ def load_annotations(
+ self,
+ names: Mapping[str, Annotation],
+ ) -> None:
+ """
+ Used by an ``Activation`` to build a container used to resolve
+ long path names into nested NameContainers.
+ Sets annotations for all supplied identifiers.
+
+ ``{"name1.name2": annotation}`` becomes two things:
+
+ 1. nc2 = NameContainer({"name2" : Referent(annotation)})
+
+ 2. nc1 = NameContainer({"name1" : Referent(nc2)})
+
+ :param names: A dictionary of {"name1.name1....": Referent, ...} items.
+ """
+ for name, refers_to in names.items():
+ self.logger.info("load_annotations %r : %r", name, refers_to)
+ if not self.extended_name_path.match(name):
+ raise ValueError(f"Invalid name {name}")
+
+ context = self
+
+ # Expand "name1.name2....": refers_to into ["name1", "name2", ...]: refers_to
+ *path, final = self.ident_pat.findall(name)
+ for name in path:
+ ref = context.setdefault(name, Referent())
+ if ref.container is None:
+ ref.container = NameContainer(parent=self.parent)
+ context = ref.container
+ context.setdefault(final, Referent(refers_to))
+
+ def load_values(self, values: Context) -> None:
+ """Update annotations with actual values."""
+ for name, refers_to in values.items():
+ self.logger.info("load_values %r : %r", name, refers_to)
+ if not self.extended_name_path.match(name):
+ raise ValueError(f"Invalid name {name}")
+
+ context = self
+
+ # Expand "name1.name2....": refers_to into ["name1", "name2", ...]: refers_to
+ # Update NameContainer("name1", NameContainer("name2", NameContainer(..., refers_to)))
+ *path, final = self.ident_pat.findall(name)
+ for name in path:
+ ref = context.setdefault(name, Referent())
+ if ref.container is None:
+ ref.container = NameContainer(parent=self.parent)
+ context = ref.container
+ context.setdefault(final, Referent()) # No annotation.
+ context[final].value = refers_to
+
+ class NotFound(Exception):
+ """
+ Raised locally when a name is not found in the middle of package search.
+ We can't return ``None`` from find_name because that's a valid value.
+ """
+ pass
+
+ @staticmethod
+ def dict_find_name(some_dict: Dict[str, Referent], path: List[str]) -> Result:
+ """
+ Extension to navgiate into mappings, messages, and packages.
+
+ :param some_dict: An instance of a MapType, MessageType, or PackageType.
+ :param path: names to follow into the structure.
+ :returns: Value found down inside the structure.
+ """
+ if path:
+ head, *tail = path
+ try:
+ return NameContainer.dict_find_name(
+ cast(Dict[str, Referent], some_dict[head]),
+ tail)
+ except KeyError:
+ NameContainer.logger.debug("%r not found in %s", head, some_dict.keys())
+ raise NameContainer.NotFound(path)
+ else:
+ return cast(Result, some_dict)
+
+ def find_name(self, path: List[str]) -> Union["NameContainer", Result]:
+ """
+ Find the name by searching down through nested packages or raise NotFound.
+ This is a kind of in-order tree walk of contained packages.
+ """
+ if path:
+ head, *tail = path
+ try:
+ sub_context = self[head].value
+ except KeyError:
+ self.logger.debug("%r not found in %s", head, self.keys())
+ raise NameContainer.NotFound(path)
+ if isinstance(sub_context, NameContainer):
+ return sub_context.find_name(tail)
+ elif isinstance(
+ sub_context,
+ (celpy.celtypes.MessageType, celpy.celtypes.MapType,
+ celpy.celtypes.PackageType, dict)
+ ):
+ # Out of defined NameContainers, moving into Values: Messages, Mappings or Packages
+ # Make a fake Referent return value.
+ item: Union["NameContainer", Result] = NameContainer.dict_find_name(
+ cast(Dict[str, Referent], sub_context),
+ tail
+ )
+ return item
+ else:
+ # Fully matched. No more Referents with NameContainers or Referents with Mappings.
+ return cast(NameContainer, sub_context)
+ else:
+ # Fully matched. This NameContainer is what we were looking for.
+ return self
+
+ def parent_iter(self) -> Iterator['NameContainer']:
+ """Yield this NameContainer and all of its parents to create a flat list."""
+ yield self
+ if self.parent is not None:
+ yield from self.parent.parent_iter()
+
+ def resolve_name(
+ self,
+ package: Optional[str],
+ name: str
+ ) -> Referent:
+ """
+ Search with less and less package prefix until we find the thing.
+
+ Resolution works as follows.
+ If a.b is a name to be resolved in the context of a protobuf declaration with scope A.B,
+ then resolution is attempted, in order, as
+
+ 1. A.B.a.b. (Search for "a" in paackage "A.B"; the ".b" is handled separately.)
+
+ 2. A.a.b. (Search for "a" in paackage "A"; the ".b" is handled separately.)
+
+ 3. (finally) a.b. (Search for "a" in paackage None; the ".b" is handled separately.)
+
+ To override this behavior, one can use .a.b;
+ this name will only be attempted to be resolved in the root scope, i.e. as a.b.
+
+ We Start with the longest package name, a ``List[str]`` assigned to ``target``.
+
+ Given a target, search through this ``NameContainer`` and all parents in the
+ :meth:`parent_iter` iterable.
+ The first name we find in the parent sequence is the goal.
+ This is because values are first, type annotations are laast.
+
+ If we can't find the identifier with given package target,
+ truncate the package name from the end to create a new target and try again.
+ This is a bottom-up look that favors the longest name.
+
+ :param package: Prefix string "name.name.name"
+ :param name: The variable we're looking for
+ :return: Name resolution as a Rereferent, often a value, but maybe a package or an
+ annotation.
+ """
+ self.logger.info(
+ "resolve_name(%r.%r) in %s, parent=%s", package, name, self.keys, self.parent
+ )
+ # Longest Name
+ if package:
+ target = self.ident_pat.findall(package) + [""]
+ else:
+ target = [""]
+ # Pool of matches
+ matches: List[Tuple[List[str], Union["NameContainer", Result]]] = []
+ # Target has an extra item to make the len non-zero.
+ while not matches and target:
+ target = target[:-1]
+ for p in self.parent_iter():
+ try:
+ package_ident: List[str] = target + [name]
+ match: Union["NameContainer", Result] = p.find_name(package_ident)
+ matches.append((package_ident, match))
+ except NameContainer.NotFound:
+ # No matches; move to the parent and try again.
+ pass
+ self.logger.debug("resolve_name: target=%s+[%r], matches=%s", target, name, matches)
+ if not matches:
+ raise KeyError(name)
+ # This feels hackish -- it should be the first referent value.
+ # Find the longest name match.p
+ path, value = max(matches, key=lambda path_value: len(path_value[0]))
+ return cast(Referent, value)
+
+ def clone(self) -> 'NameContainer':
+ new = NameContainer(parent=self.parent)
+ for k, v in self.items():
+ new[k] = v.clone()
+ return new
+
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}({dict(self)}, parent={self.parent})"
+
+
+class Activation:
+ """
+ Namespace with variable bindings and type name ("annotation") bindings.
+
+ .. rubric:: Life and Content
+
+ An Activation is created by an Environment and contains the annotations
+ (and a package name) from that Environment. Variables are loaded into the
+ activation for evaluation.
+
+ A nested Activation is created each time we evaluate a macro.
+
+ An Activation contains a ``NameContainer`` instance to resolve identifers.
+ (This may be a needless distinction and the two classes could, perhaps, be combined.)
+
+ .. todo:: The environment's annotations are type names used for protobuf.
+
+ .. rubric:: Chaining/Nesting
+
+ Activations can form a chain so locals are checked first.
+ Activations can nest via macro evaluation, creating transient local variables.
+
+ ::
+
+ ``"[2, 4, 6].map(n, n / 2)"``
+
+ means nested activations with ``n`` bound to 2, 4, and 6 respectively.
+ The resulting objects then form a resulting list.
+
+ This is used by an :py:class:`Evaluator` as follows::
+
+ sub_activation: Activation = self.activation.nested_activation()
+ sub_eval: Evaluator = self.sub_eval(sub_activation)
+ sub_eval_partial: Callable[[Value], Value] = sub_eval.partial(
+ tree_for_variable, tree_for_expression)
+ push(celtypes.ListType(map(sub_eval_partial, pop()))
+
+ The ``localized_eval()`` creates a new :py:class:`Activation`
+ and an associated :py:class:`Evaluator` for this nested activation context.
+ It uses the :py:class:`Evaluator.visit` method to evaluate the given expression for
+ a new object bound to the given variable.
+
+ .. rubric:: Namespace Creation
+
+ We expand ``{"a.b.c": 42}`` to create nested namespaces: ``{"a": {"b": {"c": 42}}}``.
+
+ This depends on two syntax rules to define the valid names::
+
+ member : primary
+ | member "." IDENT ["(" [exprlist] ")"]
+
+ primary : ["."] IDENT ["(" [exprlist] ")"]
+
+ Ignore the ``["(" [exprlist] ")"]`` options used for member functions.
+ We have members and primaries, both of which depend on the following lexical rule::
+
+ IDENT : /[_a-zA-Z][_a-zA-Z0-9]*/
+
+ Name expansion is handled in order of length. Here's why::
+
+ Scenario: "qualified_identifier_resolution_unchecked"
+ "namespace resolution should try to find the longest prefix for the evaluator."
+
+ Most names start with ``IDENT``, but a primary can start with ``.``.
+ """
+
+ def __init__(
+ self,
+ annotations: Optional[Mapping[str, Annotation]] = None,
+ package: Optional[str] = None,
+ vars: Optional[Context] = None,
+ parent: Optional['Activation'] = None,
+ ) -> None:
+ """
+ Create an Activation.
+
+ The annotations are loaded first. The variables are loaded second, and placed
+ in front of the annotations in the chain of name resolutions. Values come before
+ annotations.
+
+ :param annotations: Variables and type annotations.
+ Annotations are loaded first to serve as defaults to create a parent NameContainer.
+ :param package: The package name to assume as a prefix for name resolution.
+ :param vars: Variables and their values, loaded to update the NameContainer.
+ :param parent: A parent activation in the case of macro evaluations.
+ """
+ logger.info(
+ "Activation(annotations=%r, package=%r, vars=%r, "
+ "parent=%s)", annotations, package, vars, parent
+ )
+ # Seed the annotation identifiers for this activation.
+ self.identifiers: NameContainer = NameContainer(
+ parent=parent.identifiers if parent else None
+ )
+ if annotations is not None:
+ self.identifiers.load_annotations(annotations)
+
+ # The name of the run-time package -- an assumed prefix for name resolution
+ self.package = package
+
+ # Create a child NameContainer with variables (if any.)
+ if vars is None:
+ pass
+ elif isinstance(vars, Activation): # pragma: no cover
+ # Deprecated legacy feature.
+ raise NotImplementedError("Use Activation.clone()")
+
+ else:
+ # Set values from a dictionary of names and values.
+ self.identifiers.load_values(vars)
+
+ def clone(self) -> "Activation":
+ """
+ Create a clone of this activation with a deep copy of the identifiers.
+ """
+ clone = Activation()
+ clone.package = self.package
+ clone.identifiers = self.identifiers.clone()
+ return clone
+
+ def nested_activation(
+ self,
+ annotations: Optional[Mapping[str, Annotation]] = None,
+ vars: Optional[Context] = None
+ ) -> 'Activation':
+ """
+ Create a nested sub-Activation that chains to the current activation.
+ The sub-activations don't have the same implied package context,
+
+ :param annotations: Variable type annotations
+ :param vars: Variables with literals to be converted to the desired types.
+ :return: An ``Activation`` that chains to this Activation.
+ """
+ new = Activation(
+ annotations=annotations,
+ vars=vars,
+ parent=self,
+ package=self.package
+ )
+ return new
+
+ def resolve_variable(self, name: str) -> Union[Result, NameContainer]:
+ """Find the object referred to by the name.
+
+ An Activation usually has a chain of NameContainers to be searched.
+
+ A variable can refer to an annotation and/or a value and/or a nested
+ container. Most of the time, we want the `value` attribute of the Referent.
+ This can be a Result (a Union[Value, CelType])
+ """
+ container_or_value = self.identifiers.resolve_name(self.package, str(name))
+ return cast(Union[Result, NameContainer], container_or_value)
+
+ def __repr__(self) -> str:
+ return (
+ f"{self.__class__.__name__}"
+ f"(annotations={self.identifiers.parent!r}, "
+ f"package={self.package!r}, "
+ f"vars={self.identifiers!r}, "
+ f"parent={self.identifiers.parent})"
+ )
+
+
+class FindIdent(lark.visitors.Visitor_Recursive):
+ """Locate the ident token at the bottom of an AST.
+
+ This is needed to find the bind variable for macros.
+
+ It works by doing a "visit" on the entire tree, but saving
+ the details of the ``ident`` nodes only.
+ """
+ def __init__(self) -> None:
+ self.ident_token: Optional[str] = None
+
+ def ident(self, tree: lark.Tree) -> None:
+ ident_token = cast(lark.Token, tree.children[0])
+ self.ident_token = ident_token.value
+
+ @classmethod
+ def in_tree(cls: Type['FindIdent'], tree: lark.Tree) -> Optional[str]:
+ fi = FindIdent()
+ fi.visit(tree)
+ return fi.ident_token
+
+
+def trace(
+ method: Callable[['Evaluator', lark.Tree], Any]) -> Callable[['Evaluator', lark.Tree], Any]:
+ """
+ Decorator to create consistent evaluation trace logging.
+ This only works for a class with a ``level`` attribute.
+ This is generally applied to the methods matching rule names.
+ """
+ @wraps(method)
+ def concrete_method(self: 'Evaluator', tree: lark.Tree) -> Any:
+ self.logger.info("%s%r", self.level * '| ', tree)
+ result = method(self, tree)
+ self.logger.info("%s%s -> %r", self.level * '| ', tree.data, result)
+ return result
+ return concrete_method
+
+
+class Evaluator(lark.visitors.Interpreter[Result]):
+ """
+ Evaluate an AST in the context of a specific Activation.
+
+ See https://github.com/google/cel-go/blob/master/examples/README.md
+
+ General Evaluation.
+
+ An AST node must call ``self.visit_children(tree)`` explicitly
+ to build the values for all the children of this node.
+
+ Exceptions.
+
+ To handle ``2 / 0 || true``, the ``||``, ``&&``, and ``?:`` operators
+ do not trivially evaluate and raise exceptions. They bottle up the
+ exceptions and treat them as a kind of undecided value.
+
+ Identifiers.
+
+ Identifiers have three meanings:
+
+ - An object. This is either a variable provided in the activation or a function provided
+ when building an execution. Objects also have type annotations.
+
+ - A type annotation without an object, This is used to build protobuf messages.
+
+ - A macro name. The ``member_dot_arg`` construct may have a macro.
+ Plus the ``ident_arg`` construct may also have a ``dyn()`` or ``has()`` macro.
+ See below for more.
+
+ Other than macros, a name maps to an ``Referent`` instance. This will have an
+ annotation and -- perhaps -- an associated object.
+
+ Names have nested paths. ``a.b.c`` is a mapping, ``a``, that contains a mapping, ``b``,
+ that contains ``c``.
+
+ **MACROS ARE SPECIAL**.
+
+ The macros do not **all** simply visit their children to perform evaluation.
+ There are three cases:
+
+ - ``dyn()`` does effectively nothing.
+ It visits it's children, but also provides progressive type resolution
+ through annotation of the AST.
+
+ - ``has()`` attempts to visit the child and does a boolean transformation
+ on the result.
+ This is a macro because it doesn't raise an exception for a missing
+ member item reference, but instead maps an exception to False.
+ It doesn't return the value found, for a member item reference; instead, it maps
+ this to True.
+
+ - The various ``member.macro()`` constructs do **NOT** visit children.
+ They create a nested evaluation environment for the child variable name and expression.
+
+ The :py:meth:`member` method implements the macro evaluation behavior.
+ It does not **always** trivially descend into the children.
+ In the case of macros, the member evaluates one child tree in the presence
+ of values from another child tree using specific variable binding in a kind
+ of stack frame.
+
+ """
+ logger = logging.getLogger("Evaluator")
+
+ def __init__(
+ self,
+ ast: lark.Tree,
+ activation: Activation,
+ functions: Union[Sequence[CELFunction], Mapping[str, CELFunction], None] = None
+ ) -> None:
+ """
+ Create an evaluator for an AST with specific variables and functions.
+
+ :param ast: The AST to evaluate.
+ :param activation: The variable bindings to use.
+ :param functions: The functions to use. If nothing is supplied, the default
+ global `base_functions` are used. Otherwise a ChainMap is created so
+ these local functions override the base functions.
+ """
+ self.ast = ast
+ self.base_activation = activation
+ self.activation = self.base_activation
+ self.functions: Mapping[str, CELFunction]
+ if isinstance(functions, Sequence):
+ local_functions = {
+ f.__name__: f for f in functions or []
+ }
+ self.functions = collections.ChainMap(local_functions, base_functions) # type: ignore [arg-type]
+ elif isinstance(functions, Mapping):
+ self.functions = collections.ChainMap(functions, base_functions) # type: ignore [arg-type]
+ else:
+ self.functions = base_functions
+
+ self.level = 0
+ self.logger.info("activation: %r", self.activation)
+ self.logger.info("functions: %r", self.functions)
+
+ def sub_evaluator(self, ast: lark.Tree) -> 'Evaluator':
+ """
+ Build an evaluator for a sub-expression in a macro.
+ :param ast: The AST for the expression in the macro.
+ :return: A new `Evaluator` instance.
+ """
+ return Evaluator(ast, activation=self.activation, functions=self.functions)
+
+ def set_activation(self, values: Context) -> 'Evaluator':
+ """
+ Chain a new activation using the given Context.
+ This is used for two things:
+
+ 1. Bind external variables like command-line arguments or environment variables.
+
+ 2. Build local variable(s) for macro evaluation.
+ """
+ self.activation = self.base_activation.clone()
+ self.activation.identifiers.load_values(values)
+ self.logger.info("Activation: %r", self.activation)
+ return self
+
+ def ident_value(self, name: str, root_scope: bool = False) -> Result_Function:
+ """Resolve names in the current activation.
+ This includes variables, functions, the type registry for conversions,
+ and protobuf packages, as well as protobuf types.
+
+ We may be limited to root scope, which prevents searching through alternative
+ protobuf package definitions.
+ """
+ try:
+ return cast(Result, self.activation.resolve_variable(name))
+ except KeyError:
+ return self.functions[name]
+
+ def evaluate(self) -> celpy.celtypes.Value:
+ """
+ Evaluate this AST and return the value or raise an exception.
+
+ There are two variant use cases.
+
+ - External clients want the value or the exception.
+
+ - Internally, we sometimes want to silence CELEvalError exceptions so that
+ we can apply short-circuit logic and choose a non-exceptional result.
+ """
+ value = self.visit(self.ast)
+ if isinstance(value, CELEvalError):
+ raise value
+ return cast(celpy.celtypes.Value, value)
+
+ def visit_children(self, tree: lark.Tree) -> List[Result]:
+ """Extend the superclass to track nesting and current evaluation context.
+ """
+ self.level += 1
+ result = super().visit_children(tree)
+ self.level -= 1
+ return result
+
+ def function_eval(
+ self,
+ name_token: lark.Token,
+ exprlist: Optional[Iterable[Result]] = None) -> Result:
+ """
+ Function evaluation.
+
+ - Object creation and type conversions.
+ - Other built-in functions like size()
+ - Extension functions
+ """
+ function: CELFunction
+ try:
+ # TODO: Transitive Lookup of function in all parent activation contexts.
+ function = self.functions[name_token.value]
+ except KeyError as ex:
+ err = (
+ f"undeclared reference to '{name_token}' "
+ f"(in activation '{self.activation}')"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, token=name_token)
+ value.__cause__ = ex
+ return value
+
+ if isinstance(exprlist, CELEvalError):
+ return exprlist
+
+ try:
+ list_exprlist = cast(List[Result], exprlist or [])
+ return function(*list_exprlist)
+ except ValueError as ex:
+ value = CELEvalError(
+ "return error for overflow", ex.__class__, ex.args, token=name_token)
+ value.__cause__ = ex
+ return value
+ except (TypeError, AttributeError) as ex:
+ self.logger.debug("function_eval(%r, %s) --> %s", name_token, exprlist, ex)
+ value = CELEvalError(
+ "no such overload", ex.__class__, ex.args, token=name_token)
+ value.__cause__ = ex
+ return value
+
+ def method_eval(
+ self,
+ object: Result,
+ method_ident: lark.Token,
+ exprlist: Optional[Iterable[Result]] = None) -> Result:
+ """
+ Method evaluation. While are (nominally) attached to an object, the only thing
+ actually special is that the object is the first parameter to a function.
+ """
+ function: CELFunction
+ try:
+ # TODO: Transitive Lookup of function in all parent activation contexts.
+ function = self.functions[method_ident.value]
+ except KeyError as ex:
+ self.logger.debug("method_eval(%r, %r, %s) --> %r", object, method_ident, exprlist, ex)
+ self.logger.debug("functions: %s", self.functions)
+ err = (
+ f"undeclared reference to {method_ident.value!r} "
+ f"(in activation '{self.activation}')"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, token=method_ident)
+ value.__cause__ = ex
+ return value
+
+ if isinstance(object, CELEvalError):
+ return object
+ elif isinstance(exprlist, CELEvalError):
+ return exprlist
+
+ try:
+ list_exprlist = cast(List[Result], exprlist or [])
+ return function(object, *list_exprlist)
+ except ValueError as ex:
+ value = CELEvalError(
+ "return error for overflow", ex.__class__, ex.args, token=method_ident)
+ value.__cause__ = ex
+ return value
+ except (TypeError, AttributeError) as ex:
+ self.logger.debug("method_eval(%r, %r, %s) --> %r", object, method_ident, exprlist, ex)
+ value = CELEvalError("no such overload", ex.__class__, ex.args, token=method_ident)
+ value.__cause__ = ex
+ return value
+
+ def macro_has_eval(self, exprlist: lark.Tree) -> celpy.celtypes.BoolType:
+ """
+ The has(e.f) macro.
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+
+ 1. If e evaluates to a map, then has(e.f) indicates whether the string f is a
+ key in the map (note that f must syntactically be an identifier).
+
+ 2. If e evaluates to a message and f is not a declared field for the message,
+ has(e.f) raises a no_such_field error.
+
+ 3. If e evaluates to a protocol buffers version 2 message and f is a defined field:
+
+ - If f is a repeated field or map field, has(e.f) indicates whether the field is
+ non-empty.
+
+ - If f is a singular or oneof field, has(e.f) indicates whether the field is set.
+
+ 4. If e evaluates to a protocol buffers version 3 message and f is a defined field:
+
+ - If f is a repeated field or map field, has(e.f) indicates whether the field is
+ non-empty.
+
+ - If f is a oneof or singular message field, has(e.f) indicates whether the field
+ is set.
+
+ - If f is some other singular field, has(e.f) indicates whether the field's value
+ is its default value (zero for numeric fields, false for booleans,
+ empty for strings and bytes).
+
+ 5. In all other cases, has(e.f) evaluates to an error.
+
+ """
+ has_values = self.visit_children(exprlist)
+ return celpy.celtypes.BoolType(not isinstance(has_values[0], CELEvalError))
+
+ @trace
+ def expr(self, tree: lark.Tree) -> Result:
+ """
+ expr : conditionalor ["?" conditionalor ":" expr]
+
+ The default implementation short-circuits
+ and can ignore an CELEvalError in a sub-expression.
+
+ See https://github.com/google/cel-spec/blob/master/doc/langdef.md#logical-operators
+
+ > To get traditional left-to-right short-circuiting evaluation of logical operators,
+ as in C or other languages (also called "McCarthy Evaluation"),
+ the expression e1 && e2 can be rewritten `e1 ? e2 : false`.
+ Similarly, `e1 || e2` can be rewritten `e1 ? true : e2`.
+ """
+ if len(tree.children) == 1:
+ # expr is a single conditionalor.
+ values = self.visit_children(tree)
+ return values[0]
+ elif len(tree.children) == 3:
+ # full conditionalor "?" conditionalor ":" expr.
+ func = self.functions["_?_:_"]
+ cond_value = self.visit(cast(lark.Tree, tree.children[0]))
+ left = right = cast(Result, celpy.celtypes.BoolType(False))
+ try:
+ if cond_value:
+ left = self.visit(cast(lark.Tree, tree.children[1]))
+ else:
+ right = self.visit(cast(lark.Tree, tree.children[2]))
+ return func(cond_value, left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for _?_:_ "
+ f"applied to '({type(cond_value)}, {type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad expr node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def conditionalor(self, tree: lark.Tree) -> Result:
+ """
+ conditionalor : [conditionalor "||"] conditionaland
+
+ The default implementation short-circuits
+ and can ignore an CELEvalError in a sub-expression.
+ """
+ if len(tree.children) == 1:
+ # conditionaland with no preceding conditionalor.
+ values = self.visit_children(tree)
+ return values[0]
+ elif len(tree.children) == 2:
+ func = self.functions["_||_"]
+ left, right = cast(Tuple[Result, Result], self.visit_children(tree))
+ try:
+ return func(left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for _||_ "
+ f"applied to '({type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad conditionalor node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def conditionaland(self, tree: lark.Tree) -> Result:
+ """
+ conditionaland : [conditionaland "&&"] relation
+
+ The default implementation short-circuits
+ and can ignore an CELEvalError in a sub-expression.
+ """
+ if len(tree.children) == 1:
+ # relation with no preceding conditionaland.
+ values = self.visit_children(tree)
+ return values[0]
+ elif len(tree.children) == 2:
+ func = self.functions["_&&_"]
+ left, right = cast(Tuple[Result, Result], self.visit_children(tree))
+ try:
+ return func(left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for _&&_ "
+ f"applied to '({type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad conditionalor node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def relation(self, tree: lark.Tree) -> Result:
+ """
+ relation : [relation_lt | relation_le | relation_ge | relation_gt
+ | relation_eq | relation_ne | relation_in] addition
+
+ relation_lt : relation "<"
+ relation_le : relation "<="
+ relation_gt : relation ">"
+ relation_ge : relation ">="
+ relation_eq : relation "=="
+ relation_ne : relation "!="
+ relation_in : relation "in"
+
+ This could be refactored into separate methods to skip the lookup.
+
+ Ideally::
+
+ values = self.visit_children(tree)
+ func = functions[op_name_map[tree.data]]
+ result = func(*values)
+
+ The AST doesn't provide a flat list of values, however.
+ """
+ if len(tree.children) == 1:
+ # addition with no preceding relation.
+ values = self.visit_children(tree)
+ return values[0]
+
+ elif len(tree.children) == 2:
+ left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children)
+ op_name = {
+ "relation_lt": "_<_",
+ "relation_le": "_<=_",
+ "relation_ge": "_>=_",
+ "relation_gt": "_>_",
+ "relation_eq": "_==_",
+ "relation_ne": "_!=_",
+ "relation_in": "_in_",
+ }[left_op.data]
+ func = self.functions[op_name]
+ # NOTE: values have the structure [[left], right]
+ (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree))
+ self.logger.debug("relation %r %s %r", left, op_name, right)
+ try:
+ return func(left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for {left_op.data!r} "
+ f"applied to '({type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad relation node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def addition(self, tree: lark.Tree) -> Result:
+ """
+ addition : [addition_add | addition_sub] multiplication
+
+ addition_add : addition "+"
+ addition_sub : addition "-"
+
+ This could be refactored into separate methods to skip the lookup.
+
+ Ideally::
+
+ values = self.visit_children(tree)
+ func = functions[op_name_map[tree.data]]
+ result = func(*values)
+
+ The AST doesn't provide a flat list of values, however.
+ """
+ if len(tree.children) == 1:
+ # multiplication with no preceding addition.
+ values = self.visit_children(tree)
+ return values[0]
+
+ elif len(tree.children) == 2:
+ left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children)
+ op_name = {
+ "addition_add": "_+_",
+ "addition_sub": "_-_",
+ }[left_op.data]
+ func = self.functions[op_name]
+ # NOTE: values have the structure [[left], right]
+ (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree))
+ self.logger.debug("addition %r %s %r", left, op_name, right)
+ try:
+ return func(left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for {left_op.data!r} "
+ f"applied to '({type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except (ValueError, OverflowError) as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad addition node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def multiplication(self, tree: lark.Tree) -> Result:
+ """
+ multiplication : [multiplication_mul | multiplication_div | multiplication_mod] unary
+
+ multiplication_mul : multiplication "*"
+ multiplication_div : multiplication "/"
+ multiplication_mod : multiplication "%"
+
+ This could be refactored into separate methods to skip the lookup.
+
+ Ideally::
+
+ values = self.visit_children(tree)
+ func = functions[op_name_map[tree.data]]
+ result = func(*values)
+
+ The AST doesn't provide a flat list of values, however.
+ """
+ if len(tree.children) == 1:
+ # unary with no preceding multiplication.
+ values = self.visit_children(tree)
+ return values[0]
+
+ elif len(tree.children) == 2:
+ left_op, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children)
+ op_name = {
+ "multiplication_div": "_/_",
+ "multiplication_mul": "_*_",
+ "multiplication_mod": "_%_",
+ }[left_op.data]
+ func = self.functions[op_name]
+ # NOTE: values have the structure [[left], right]
+ (left, *_), right = cast(Tuple[List[Result], Result], self.visit_children(tree))
+ self.logger.debug("multiplication %r %s %r", left, op_name, right)
+ try:
+ return func(left, right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ err = (
+ f"found no matching overload for {left_op.data!r} "
+ f"applied to '({type(left)}, {type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except ZeroDivisionError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ value = CELEvalError("modulus or divide by zero", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except (ValueError, OverflowError) as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, left, right, ex)
+ value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad multiplication node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+
+ )
+
+ @trace
+ def unary(self, tree: lark.Tree) -> Result:
+ """
+ unary : [unary_not | unary_neg] member
+
+ unary_not : "!"
+ unary_neg : "-"
+
+ This should be refactored into separate methods to skip the lookup.
+
+ ideally::
+
+ values = self.visit_children(tree)
+ func = functions[op_name_map[tree.data]]
+ result = func(*values)
+
+ But, values has the structure ``[[], right]``
+ """
+ if len(tree.children) == 1:
+ # member with no preceeding unary_not or unary_neg
+ # TODO: If there are two possible values (namespace v. mapping) chose the namespace.
+ values = self.visit_children(tree)
+ return values[0]
+
+ elif len(tree.children) == 2:
+ op_tree, right_tree = cast(Tuple[lark.Tree, lark.Tree], tree.children)
+ op_name = {
+ "unary_not": "!_",
+ "unary_neg": "-_",
+ }[op_tree.data]
+ func = self.functions[op_name]
+ # NOTE: values has the structure [[], right]
+ left, right = cast(Tuple[List[Result], Result], self.visit_children(tree))
+ self.logger.debug("unary %s %r", op_name, right)
+ try:
+ return func(right)
+ except TypeError as ex:
+ self.logger.debug("%s(%s) --> %s", func.__name__, right, ex)
+ err = (
+ f"found no matching overload for {op_tree.data!r} "
+ f"applied to '({type(right)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except ValueError as ex:
+ self.logger.debug("%s(%s) --> %s", func.__name__, right, ex)
+ value = CELEvalError("return error for overflow", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad unary node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+
+ )
+
+ def build_macro_eval(self, child: lark.Tree) -> Callable[[celpy.celtypes.Value], Any]:
+ """
+ Builds macro function.
+
+ For example
+
+ ``[1, 2, 3].map(n, n/2)``
+
+ Builds the function = ``lambda n: n/2``.
+
+ The function will expose exceptions, disabling short-circuit ``||`` and ``&&``.
+
+ The `child` is a `member_dot_arg` construct:
+
+ - [0] is the expression to the left of the '.'
+
+ - [1] is the function, `map`, to the right of the `.`
+
+ - [2] is the arguments in ()'s.
+ Within this, there are two children: a variable and an expression.
+ """
+ args = cast(lark.Tree, child.children[2])
+ var_tree, expr_tree = cast(Tuple[lark.Tree, lark.Tree], args.children)
+ identifier = FindIdent.in_tree(var_tree)
+ if identifier is None: # pragma: no cover
+ # This seems almost impossible.
+ raise CELSyntaxError(
+ f"{child.data} {child.children}: bad macro node",
+ line=child.meta.line,
+ column=child.meta.column,
+ )
+ # nested_eval = Evaluator(ast=expr_tree, activation=self.activation)
+ nested_eval = self.sub_evaluator(ast=expr_tree)
+
+ def sub_expr(v: celpy.celtypes.Value) -> Any:
+ return nested_eval.set_activation({identifier: v}).evaluate()
+
+ return sub_expr
+
+ def build_ss_macro_eval(self, child: lark.Tree) -> Callable[[celpy.celtypes.Value], Any]:
+ """
+ Builds macro function for short-circuit logical evaluation ignoring exception values.
+
+ For example
+
+ ``[1, 2, 'hello'].exists(n, n >= 2)``
+
+ Builds the function = ``lambda n: n >= 2``.
+
+ The function will swallow exceptions, enabling short-circuit ``||`` and ``&&``.
+ """
+ args = cast(lark.Tree, child.children[2])
+ var_tree, expr_tree = cast(Tuple[lark.Tree, lark.Tree], args.children)
+ identifier = FindIdent.in_tree(var_tree)
+ if identifier is None: # pragma: no cover
+ # This seems almost impossible.
+ raise CELSyntaxError(
+ f"{child.data} {child.children}: bad macro node",
+ line=child.meta.line,
+ column=child.meta.column,
+ )
+ # nested_eval = Evaluator(ast=expr_tree, activation=self.activation)
+ nested_eval = self.sub_evaluator(ast=expr_tree)
+
+ def sub_expr(v: celpy.celtypes.Value) -> Any:
+ try:
+ return nested_eval.set_activation({identifier: v}).evaluate()
+ except CELEvalError as ex:
+ return ex
+
+ return sub_expr
+
+ def build_reduce_macro_eval(
+ self, child: lark.Tree
+ ) -> Tuple[Callable[[Result, Result], Result], lark.Tree]:
+ """
+ Builds macro function and intiial expression for reduce().
+
+ For example
+
+ ``[0, 1, 2].reduce(r, i, 0, r + 2*i+1)``
+
+ Builds the function = ``lambda r, i: r + 2*i+1`` and initial value = 0.
+
+ The `child` is a `member_dot_arg` construct:
+
+ - [0] is the expression to the left of the '.'
+
+ - [1] is the function, `reduce`, to the right of the `.`
+
+ - [2] is the arguments in ()'s.
+ Within this, there are four children: two variables and two expressions.
+ """
+ args = cast(lark.Tree, child.children[2])
+ reduce_var_tree, iter_var_tree, init_expr_tree, expr_tree = (
+ cast(Tuple[lark.Tree, lark.Tree, lark.Tree, lark.Tree], args.children)
+ )
+ reduce_ident = FindIdent.in_tree(reduce_var_tree)
+ iter_ident = FindIdent.in_tree(iter_var_tree)
+ if reduce_ident is None or iter_ident is None: # pragma: no cover
+ # This seems almost impossible.
+ raise CELSyntaxError(
+ f"{child.data} {child.children}: bad macro node",
+ line=child.meta.line,
+ column=child.meta.column,
+ )
+ # nested_eval = Evaluator(ast=expr_tree, activation=self.activation)
+ nested_eval = self.sub_evaluator(ast=expr_tree)
+
+ def sub_expr(r: Result, i: Result) -> Result:
+ return nested_eval.set_activation(
+ {reduce_ident: r, iter_ident: i}).evaluate()
+
+ return sub_expr, init_expr_tree
+
+ @trace
+ def member(self, tree: lark.Tree) -> Result:
+ """
+ member : member_dot | member_dot_arg | member_item | member_object | primary
+
+ member_dot : member "." IDENT
+ member_dot_arg : member "." IDENT "(" [exprlist] ")"
+ member_item : member "[" expr "]"
+ member_object : member "{" [fieldinits] "}"
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+ """
+ values = self.visit_children(tree)
+ return values[0]
+
+ @trace
+ def member_dot(self, tree: lark.Tree) -> Result:
+ """
+ member : member_dot | member_dot_arg | member_item | member_object | primary
+
+ member_dot : member "." IDENT
+ member_dot_arg : member "." IDENT "(" [exprlist] ")"
+ member_item : member "[" expr "]"
+ member_object : member "{" [fieldinits] "}"
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#name-resolution
+
+ - ``primary``: Variables and Functions: some simple names refer to variables in the
+ execution context, standard functions, or other name bindings provided by the CEL
+ application.
+
+ - ``member_dot``: Field selection: appending a period and identifier to an expression
+ could indicate that we're accessing a field within a protocol buffer or map.
+ See below for **Field Selection**.
+
+ - ``member_dot``: Protocol buffer package names: a simple or qualified name could
+ represent an absolute or relative name in the protocol buffer package namespace.
+ Package names must be followed by a message type, enum type, or enum constant.
+
+ - ``member_dot``: Protocol buffer message types, enum types, and enum constants:
+ following an optional protocol buffer package name, a simple or qualified name
+ could refer to a message type, and enum type, or an enum constant in the package's
+ namespace.
+
+ Field Selection. There are four cases.
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+
+ - If e evaluates to a message
+ and f is not declared in this message, the runtime error no_such_field is raised.
+
+ - If e evaluates to a message
+ and f is declared, but the field is not set,
+ the default value of the field's type will be produced.
+
+ - If e evaluates to a map, then e.f is equivalent to e['f'].
+
+ - In all other cases, e.f evaluates to an error.
+
+ TODO: implement member "." IDENT for messages.
+ """
+ member_tree, property_name_token = cast(Tuple[lark.Tree, lark.Token], tree.children)
+ member = self.visit(member_tree)
+ property_name = property_name_token.value
+ result: Result
+ if isinstance(member, CELEvalError):
+ result = member
+ elif isinstance(member, NameContainer):
+ # Navigation through names provided as external run-time bindings.
+ # The dict is the value of a Referent that was part of a namespace path.
+ if property_name in member:
+ result = member[property_name].value
+ else:
+ err = f"No {property_name!r} in bindings {sorted(member.keys())}"
+ result = CELEvalError(err, KeyError, None, tree=tree)
+ # TODO: Not sure this is needed...
+ elif isinstance(member, celpy.celtypes.MessageType):
+ self.logger.info("member_dot(%r, %r)", member, property_name)
+ result = member.get(property_name)
+ # TODO: Future Expansion, handle Protobuf message package...
+ # elif isinstance(member, celpy.celtypes.PackageType):
+ # if property_name in member:
+ # result = member[property_name]
+ # else:
+ # err = f"no such message {property_name!r} in package {member}"
+ # result = CELEvalError(err, KeyError, None, tree=tree)
+ elif isinstance(member, celpy.celtypes.MapType):
+ # Syntactic sugar: a.b is a["b"] when a is a mapping.
+ try:
+ result = member[property_name]
+ except KeyError:
+ err = f"no such member in mapping: {property_name!r}"
+ result = CELEvalError(err, KeyError, None, tree=tree)
+ else:
+ err = f"{member!r} with type: '{type(member)}' does not support field selection"
+ result = CELEvalError(err, TypeError, None, tree=tree)
+ return result
+
+ @trace
+ def member_dot_arg(self, tree: lark.Tree) -> Result:
+ """
+ member : member_dot | member_dot_arg | member_item | member_object | primary
+
+ member_dot : member "." IDENT
+ member_dot_arg : member "." IDENT "(" [exprlist] ")"
+ member_item : member "[" expr "]"
+ member_object : member "{" [fieldinits] "}"
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+
+ Method or macro? We Distinguish between these three similar cases.
+
+ - Macros: https://github.com/google/cel-spec/blob/master/doc/langdef.md#macros
+
+ - member "." IDENT "(" [exprlist] ")" -- used for string operations
+
+ - member "." IDENT "(" ")" -- used for a several timestamp operations.
+ """
+ sub_expr: CELFunction
+ result: Result
+ reduction: Result
+ CELBoolFunction = Callable[[celpy.celtypes.BoolType, Result], celpy.celtypes.BoolType]
+
+ member_tree, method_name_token = cast(Tuple[lark.Tree, lark.Token], tree.children[:2])
+
+ if method_name_token.value == "map":
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ sub_expr = self.build_macro_eval(tree)
+ mapping = cast(Iterable[celpy.celtypes.Value], map(sub_expr, member_list))
+ result = celpy.celtypes.ListType(mapping)
+ return result
+
+ elif method_name_token.value == "filter":
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ sub_expr = self.build_macro_eval(tree)
+ result = celpy.celtypes.ListType(filter(sub_expr, member_list))
+ return result
+
+ elif method_name_token.value == "all":
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ and_oper = cast(
+ CELBoolFunction,
+ eval_error("no such overload", TypeError)(
+ celpy.celtypes.logical_and)
+ )
+ sub_expr = self.build_ss_macro_eval(tree)
+ reduction = reduce(and_oper, map(sub_expr, member_list), celpy.celtypes.BoolType(True))
+ return reduction
+
+ elif method_name_token.value == "exists":
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ or_oper = cast(
+ CELBoolFunction,
+ eval_error("no such overload", TypeError)(
+ celpy.celtypes.logical_or)
+ )
+ sub_expr = self.build_ss_macro_eval(tree)
+ reduction = reduce(or_oper, map(sub_expr, member_list), celpy.celtypes.BoolType(False))
+ return reduction
+
+ elif method_name_token.value == "exists_one":
+ # Is there exactly 1?
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ sub_expr = self.build_macro_eval(tree)
+ count = sum(1 for value in member_list if bool(sub_expr(value)))
+ return celpy.celtypes.BoolType(count == 1)
+
+ elif method_name_token.value == "reduce":
+ # Apply a function to reduce the list to a single value.
+ # The `tree` is a `member_dot_arg` construct with (member, method_name, args)
+ # The args have two variables and two expressions.
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ reduce_expr, init_expr_tree = self.build_reduce_macro_eval(tree)
+ initial_value = self.visit(init_expr_tree)
+ reduction = reduce(reduce_expr, member_list, initial_value)
+ return reduction
+
+ elif method_name_token.value == "min":
+ # Special case of "reduce()"
+ # with <member>.min() -> <member>.reduce(r, i, int_max, r < i ? r : i)
+ member_list = cast(celpy.celtypes.ListType, self.visit(member_tree))
+ try:
+ # Note. The Result type includes None, which will raise an exception.
+ reduction = min(member_list) # type: ignore [type-var]
+ except ValueError as ex:
+ err = "Attempt to reduce an empty sequence or a sequence with a None value"
+ reduction = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ return reduction
+
+ else:
+ # Not a macro: a method evaluation.
+ # Evaluate member, method IDENT and (if present) exprlist and apply.
+ if len(tree.children) == 2:
+ member, ident = cast(
+ Tuple[Result, lark.Token],
+ self.visit_children(tree)
+ )
+ result = self.method_eval(member, ident)
+ else:
+ # assert len(tree.children) == 3
+ member, ident, expr_iter = cast(
+ Tuple[Result, lark.Token, Iterable[Result]],
+ self.visit_children(tree)
+ )
+ result = self.method_eval(member, ident, expr_iter)
+ return result
+
+ @trace
+ def member_index(self, tree: lark.Tree) -> Result:
+ """
+ member : member_dot | member_dot_arg | member_item | member_object | primary
+
+ member_dot : member "." IDENT
+ member_dot_arg : member "." IDENT "(" [exprlist] ")"
+ member_item : member "[" expr "]"
+ member_object : member "{" [fieldinits] "}"
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+
+ Locating an item in a Mapping or List
+ """
+ func = self.functions["_[_]"]
+ values = self.visit_children(tree)
+ member, index = values
+ try:
+ return func(member, index)
+ except TypeError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex)
+ err = (
+ f"found no matching overload for _[_] "
+ f"applied to '({type(member)}, {type(index)})'"
+ )
+ value = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except KeyError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex)
+ value = CELEvalError("no such key", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+ except IndexError as ex:
+ self.logger.debug("%s(%s, %s) --> %s", func.__name__, member, index, ex)
+ value = CELEvalError("invalid_argument", ex.__class__, ex.args, tree=tree)
+ value.__cause__ = ex
+ return value
+
+ @trace
+ def member_object(self, tree: lark.Tree) -> Result:
+ """
+ member : member_dot | member_dot_arg | member_item | member_object | primary
+
+ member_dot : member "." IDENT
+ member_dot_arg : member "." IDENT "(" [exprlist] ")"
+ member_item : member "[" expr "]"
+ member_object : member "{" [fieldinits] "}"
+
+ https://github.com/google/cel-spec/blob/master/doc/langdef.md#field-selection
+
+ An object constructor requires a protobyf type, not an object as the "member".
+ """
+ values = self.visit_children(tree)
+
+ if len(values) == 1:
+ # primary | member "{" "}"
+ if cast(lark.Tree, tree.children[0]).data == "primary":
+ value = values[0]
+ else:
+ # Build a default protobuf message.
+ protobuf_class = cast(
+ celpy.celtypes.FunctionType,
+ values[0]
+ )
+ self.logger.debug("Creating %s()", protobuf_class)
+ try:
+ value = protobuf_class(None)
+ except (TypeError, ValueError) as ex: # pragma: no cover
+ value = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+ self.logger.debug("Created %s", value)
+ return value
+
+ elif len(values) == 2:
+ # protobuf feature: member "{" fieldinits "}"
+ member, fieldinits = values
+ if isinstance(member, CELEvalError):
+ return member
+ # Apply fieldinits as the constructor for an instance of the referenced type.
+ protobuf_class = cast(
+ celpy.celtypes.FunctionType,
+ member
+ )
+ # NOTE: protobuf MessageType conversions are the responsibility of the target type.
+ # We can't -- easily -- generalize this.
+ self.logger.info("Creating %s(%r)", protobuf_class, fieldinits)
+ try:
+ value = protobuf_class(cast(celpy.celtypes.Value, fieldinits))
+ except (TypeError, ValueError) as ex: # pragma: no cover
+ value = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+ self.logger.info("Created %r", value)
+ return value
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad member_object node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+
+ )
+
+ @trace
+ def primary(self, tree: lark.Tree) -> Result:
+ """
+ primary : dot_ident_arg | dot_ident | ident_arg | ident
+ | paren_expr | list_lit | map_lit | literal
+
+ dot_ident_arg : "." IDENT "(" [exprlist] ")"
+ dot_ident : "." IDENT
+ ident_arg : IDENT "(" [exprlist] ")"
+ ident : IDENT
+ paren_expr : "(" expr ")"
+ list_lit : "[" [exprlist] "]"
+ map_lit : "{" [mapinits] "}"
+
+ TODO: Refactor into separate methods to skip this complex elif chain.
+ top-level :py:meth:`primary` is similar to :py:meth:`method`.
+ Each of the individual rules then works with a tree instead of a child of the
+ primary tree.
+
+ This includes function-like macros: has() and dyn().
+ These are special cases and cannot be overridden.
+ """
+ result: Result
+ name_token: lark.Token
+ if len(tree.children) != 1:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad primary node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ child = cast(lark.Tree, tree.children[0])
+ if child.data == "literal":
+ # A literal value
+ values = self.visit_children(tree)
+ return values[0]
+
+ elif child.data == "paren_expr":
+ # A "(" expr ")"
+ values = self.visit_children(child)
+ return values[0]
+
+ elif child.data == "list_lit":
+ if len(child.children) == 0:
+ # Empty list
+ # TODO: Refactor into type_eval()
+ result = celpy.celtypes.ListType()
+ else:
+ # exprlist to be packaged as List.
+ values = self.visit_children(child)
+ result = values[0]
+ return result
+
+ elif child.data == "map_lit":
+ if len(child.children) == 0:
+ # Empty mapping
+ # TODO: Refactor into type_eval()
+ result = celpy.celtypes.MapType()
+ else:
+ # mapinits (a sequence of key-value tuples) to be packaged as a dict.
+ # OR. An CELEvalError in case of ValueError caused by duplicate keys.
+ # OR. An CELEvalError in case of TypeError cause by invalid key types.
+ # TODO: Refactor into type_eval()
+ try:
+ values = self.visit_children(child)
+ result = values[0]
+ except ValueError as ex:
+ result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+ except TypeError as ex:
+ result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+ return result
+
+ elif child.data in ("dot_ident", "dot_ident_arg"):
+ # "." IDENT ["(" [exprlist] ")"]
+ # Leading "." means the name is resolved in the root scope **only**.
+ # No searching through alterantive packages.
+ # len(child) == 1 -- "." IDENT
+ # len(child) == 2 -- "." IDENT "(" exprlist ")" -- TODO: Implement dot_ident_arg.
+ values = self.visit_children(child)
+ name_token = cast(lark.Token, values[0])
+ # Should not be a Function, should only be a Result
+ # TODO: implement dot_ident_arg uses function_eval().
+ try:
+ result = cast(Result, self.ident_value(name_token.value, root_scope=True))
+ except KeyError as ex:
+ result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+ return result
+
+ elif child.data == "ident_arg":
+ # IDENT ["(" [exprlist] ")"]
+ # Can be a proper function or one of the function-like macros: "has()", "dyn()".
+ exprlist: lark.Tree
+ if len(child.children) == 1:
+ name_token = cast(lark.Token, child.children[0])
+ exprlist = lark.Tree(data="exprlist", children=[])
+ elif len(child.children) == 2:
+ name_token, exprlist = cast(Tuple[lark.Token, lark.Tree], child.children)
+ else:
+ raise CELSyntaxError( # pragma: no cover
+ f"{tree.data} {tree.children}: bad primary node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+
+ )
+
+ if name_token.value == "has":
+ # has() macro. True if the child expression is a member expression that evaluates.
+ # False if the child expression is a member expression that cannot be evaluated.
+ return self.macro_has_eval(exprlist)
+ elif name_token.value == "dyn":
+ # dyn() macro does nothing; it's for run-time type-checking.
+ dyn_values = self.visit_children(exprlist)
+ return dyn_values[0]
+ else:
+ # Ordinary function() evaluation.
+ values = self.visit_children(exprlist)
+ return self.function_eval(name_token, cast(Iterable[celpy.celtypes.Value], values))
+
+ elif child.data == "ident":
+ # IDENT -- simple identifier from the current activation.
+ name_token = cast(lark.Token, child.children[0])
+ try:
+ # Should not be a Function.
+ # Generally Result object (i.e., a variable)
+ # Could be an Annotation object (i.e., a type) for protobuf messages
+ result = cast(Result, self.ident_value(name_token.value))
+ except KeyError as ex:
+ err = (
+ f"undeclared reference to '{name_token}' "
+ f"(in activation '{self.activation}')"
+ )
+ result = CELEvalError(err, ex.__class__, ex.args, tree=tree)
+ return result
+
+ else:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad primary node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+
+ @trace
+ def literal(self, tree: lark.Tree) -> Result:
+ """
+ Create a literal from the token at the top of the parse tree.
+
+ .. todo:: Use type provider conversions from string to CEL type objects.
+ """
+ if len(tree.children) != 1:
+ raise CELSyntaxError(
+ f"{tree.data} {tree.children}: bad literal node",
+ line=tree.meta.line,
+ column=tree.meta.column,
+
+ )
+ value_token = cast(lark.Token, tree.children[0])
+ try:
+ result: Result
+ if value_token.type == "FLOAT_LIT":
+ result = celpy.celtypes.DoubleType(value_token.value)
+ elif value_token.type == "INT_LIT":
+ result = celpy.celtypes.IntType(value_token.value)
+ elif value_token.type == "UINT_LIT":
+ if not value_token.value[-1].lower() == 'u':
+ raise CELSyntaxError(
+ f"invalid unsigned int literal {value_token!r}",
+ line=tree.meta.line,
+ column=tree.meta.column,
+ )
+ result = celpy.celtypes.UintType(value_token.value[:-1])
+ elif value_token.type in ("MLSTRING_LIT", "STRING_LIT"):
+ result = celstr(value_token)
+ elif value_token.type == "BYTES_LIT":
+ result = celbytes(value_token)
+ elif value_token.type == "BOOL_LIT":
+ result = (
+ celpy.celtypes.BoolType(value_token.value.lower() == "true")
+ )
+ elif value_token.type == "NULL_LIT":
+ result = None
+ else:
+ raise CELUnsupportedError(
+ f"{tree.data} {tree.children}: type not implemented",
+ line=value_token.line,
+ column=value_token.column,
+ )
+ except ValueError as ex:
+ result = CELEvalError(ex.args[0], ex.__class__, ex.args, tree=tree)
+
+ return result
+
+ @trace
+ def exprlist(self, tree: lark.Tree) -> Result:
+ """
+ exprlist : expr ("," expr)*
+ """
+ values = self.visit_children(tree)
+ errors = (v for v in values if isinstance(v, CELEvalError))
+ try:
+ return next(errors)
+ except StopIteration:
+ pass
+ # There are no CELEvalError values in the result, so we can narrow the domain.
+ result = celpy.celtypes.ListType(cast(List[celpy.celtypes.Value], values))
+ return result
+
+ @trace
+ def fieldinits(self, tree: lark.Tree) -> Result:
+ """
+ fieldinits : IDENT ":" expr ("," IDENT ":" expr)*
+
+ The even items, children[0::2] are identifiers, nothing to evaluate.
+ The odd items, childnre[1::2] are expressions.
+
+ This creates a mapping, used by the :meth:`member_object` method to create
+ and populate a protobuf object. Duplicate names are an error.
+ """
+ fields: Dict[str, Any] = {}
+ pairs = cast(
+ Iterable[Tuple[lark.Token, lark.Tree]],
+ zip(tree.children[0::2], tree.children[1::2])
+ )
+ for ident_node, expr_node in pairs:
+ ident = ident_node.value
+ expr = cast(celpy.celtypes.Value, self.visit_children(expr_node)[0])
+ if ident in fields:
+ raise ValueError(f"Duplicate field label {ident!r}")
+ fields[ident] = expr
+ return celpy.celtypes.MessageType(**fields)
+
+ @trace
+ def mapinits(self, tree: lark.Tree) -> Result:
+ """
+ mapinits : expr ":" expr ("," expr ":" expr)*
+
+ Extract the key expr's and value expr's to a list of pairs.
+ This raises an exception on a duplicate key.
+
+ TODO: Is ``{'a': 1, 'b': 2/0}['a']`` a meaningful result in CEL?
+ Or is this an error because the entire member is erroneous?
+
+ """
+ result = celpy.celtypes.MapType()
+
+ # Not sure if this cast is sensible. Should a CELEvalError propagate up from the
+ # sub-expressions? See the error check in :py:func:`exprlist`.
+ keys_values = cast(List[celpy.celtypes.Value], self.visit_children(tree))
+ pairs = zip(keys_values[0::2], keys_values[1::2])
+ for key, value in pairs:
+ if key in result:
+ raise ValueError(f"Duplicate key {key!r}")
+ result[key] = value
+
+ return result
+
+
+CEL_ESCAPES_PAT = re.compile(
+ "\\\\[abfnrtv\"'\\\\]|\\\\\\d{3}|\\\\x[0-9a-fA-F]{2}|\\\\u[0-9a-fA-F]{4}|\\\\U[0-9a-fA-F]{8}|."
+)
+
+
+CEL_ESCAPES = {
+ '\\a': '\a', '\\b': '\b', '\\f': '\f', '\\n': '\n',
+ '\\r': '\r', '\\t': '\t', '\\v': '\v',
+ '\\"': '"', "\\'": "'", '\\\\': '\\'
+}
+
+
+def celstr(token: lark.Token) -> celpy.celtypes.StringType:
+ """
+ Evaluate a CEL string literal, expanding escapes to create a Python string.
+
+ It may be that built-in ``eval()`` might work for some of this, but
+ the octal escapes aren't really viable.
+
+ :param token: CEL token value
+ :return: str
+
+ .. todo:: This can be refactored into celpy.celtypes.StringType.
+ """
+ def expand(match_iter: Iterable[Match[str]]) -> Iterator[str]:
+ for match in (m.group() for m in match_iter):
+ if len(match) == 1:
+ expanded = match
+ elif match[:2] == r'\x':
+ expanded = chr(int(match[2:], 16))
+ elif match[:2] in {r'\u', r'\U'}:
+ expanded = chr(int(match[2:], 16))
+ elif match[:1] == '\\' and len(match) == 4:
+ expanded = chr(int(match[1:], 8))
+ else:
+ expanded = CEL_ESCAPES.get(match, match)
+ yield expanded
+
+ text = token.value
+ if text[:1] in ("R", "r"):
+ # Raw; ignore ``\`` escapes
+ if text[1:4] == '"""' or text[1:4] == "'''":
+ # Long
+ expanded = text[4:-3]
+ else:
+ # Short
+ expanded = text[2:-1]
+ else:
+ # Cooked; expand ``\`` escapes
+ if text[0:3] == '"""' or text[0:3] == "'''":
+ # Long
+ match_iter = CEL_ESCAPES_PAT.finditer(text[3:-3])
+ else:
+ # Short
+ match_iter = CEL_ESCAPES_PAT.finditer(text[1:-1])
+ expanded = ''.join(expand(match_iter))
+ return celpy.celtypes.StringType(expanded)
+
+
+def celbytes(token: lark.Token) -> celpy.celtypes.BytesType:
+ """
+ Evaluate a CEL bytes literal, expanding escapes to create a Python bytes object.
+
+ :param token: CEL token value
+ :return: bytes
+
+ .. todo:: This can be refactored into celpy.celtypes.BytesType.
+ """
+ def expand(match_iter: Iterable[Match[str]]) -> Iterator[int]:
+ for match in (m.group() for m in match_iter):
+ if len(match) == 1:
+ yield from match.encode('utf-8')
+ elif match[:2] == r'\x':
+ yield int(match[2:], 16)
+ elif match[:2] == r'\u':
+ yield int(match[2:], 16)
+ elif match[:1] == '\\' and len(match) == 4:
+ yield int(match[1:], 8)
+ else:
+ yield ord(CEL_ESCAPES.get(match, match))
+
+ text = token.value
+ if text[:2].lower() == "br":
+ # Raw; ignore ``\`` escapes
+ if text[2:5] == '"""' or text[2:5] == "'''":
+ # Long
+ expanded = celpy.celtypes.BytesType(ord(c) for c in text[5:-3])
+ else:
+ # Short
+ expanded = celpy.celtypes.BytesType(ord(c) for c in text[3:-1])
+ elif text[:1].lower() == "b":
+ # Cooked; expand ``\`` escapes
+ if text[1:4] == '"""' or text[1:4] == "'''":
+ # Long
+ match_iter = CEL_ESCAPES_PAT.finditer(text[4:-3])
+ else:
+ # Short
+ match_iter = CEL_ESCAPES_PAT.finditer(text[2:-1])
+ expanded = celpy.celtypes.BytesType(expand(match_iter))
+ else:
+ raise ValueError(f"Invalid bytes literal {token.value!r}")
+ return expanded
diff --git a/.venv/lib/python3.12/site-packages/celpy/py.typed b/.venv/lib/python3.12/site-packages/celpy/py.typed
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/py.typed