about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/celpy/__main__.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy/__main__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/__main__.py465
1 files changed, 465 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/__main__.py b/.venv/lib/python3.12/site-packages/celpy/__main__.py
new file mode 100644
index 00000000..c9483173
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/__main__.py
@@ -0,0 +1,465 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+Pure Python implementation of CEL.
+
+This provides a few jq-like, bc-like, and shell expr-like features.
+
+-   ``jq`` uses ``.`` to refer the current document. By setting a package
+    name of ``"jq"`` and placing the JSON object in the package, we achieve
+    similar syntax.
+
+-   ``bc`` offers complex function definitions and other programming support.
+    CEL can only evaluate a few bc-like expressions.
+
+-   This does everything ``expr`` does, but the syntax is slightly different.
+    The output of comparisons -- by default -- is boolean, where ``expr`` is an integer 1 or 0.
+    Use ``-f 'd'`` to see decimal output instead of Boolean text values.
+
+-   This does some of what ``test`` does, without a lot of the sophisticated
+    file system data gathering.
+    Use ``-b`` to set the exit status code from a Boolean result.
+
+TODO: This can also have a REPL, as well as process CSV files.
+
+SYNOPSIS
+========
+
+::
+
+    python -m celpy [--arg name:type=value ...] [--null-input] expr
+
+Options:
+
+:--arg:
+    Provides argument names, types and optional values.
+    If the value is not provided, the name is expected to be an environment
+    variable, and the value of the environment variable is converted and used.
+
+:--null-input:
+    Normally, JSON documents are read from stdin in ndjson format. If no JSON documents are
+    provided, the ``--null-input`` option skips trying to read from stdin.
+
+:expr:
+    A CEL expression to evaluate.
+
+JSON documents are read from stdin in NDJSON format (http://jsonlines.org/, http://ndjson.org/).
+For each JSON document, the expression is evaluated with the document in a default
+package. This allows `.name` to pick items from the document.
+
+By default, the output is JSON serialized. This means strings will be JSON-ified and have quotes.
+
+If a ``--format`` option is provided, this is applied to the resulting object; this can be
+used to strip quotes, or limit precision on double objects, or convert numbers to hexadecimal.
+
+Arguments, Types, and Namespaces
+================================
+
+CEL objects rely on the celtypes definitions.
+
+Because of the close association between CEL and protobuf, some well-known protobuf types
+are also supported.
+
+..  todo:: CLI type environment
+
+    Permit name.name:type=value to create namespace bindings.
+
+Further, type providers can be bound to CEL. This means an extended CEL
+may have additional types beyond those defined by the :py:class:`Activation` class.
+
+"""
+
+import argparse
+import ast
+import cmd
+import json
+import logging
+import os
+import re
+import sys
+from typing import Any, Callable, Dict, List, Optional, Tuple, cast
+
+from celpy import Environment, Runner, celtypes
+from celpy.adapter import CELJSONDecoder, CELJSONEncoder
+from celpy.celparser import CELParseError
+from celpy.evaluation import Annotation, CELEvalError, Result
+
+logger = logging.getLogger("celpy")
+
+
+# For argument parsing purposes.
+# Note the reliance on `ast.literal_eval` for ListType and MapType conversions.
+# Other types convert strings directly. These types need some help.
+CLI_ARG_TYPES: Dict[str, Annotation] = {
+    "int":
+        celtypes.IntType,
+    "uint":
+        celtypes.UintType,
+    "double":
+        celtypes.DoubleType,
+    "bool":
+        celtypes.BoolType,
+    "string":
+        celtypes.StringType,
+    "bytes":
+        celtypes.BytesType,
+    "list":
+        cast(Callable[..., celtypes.Value], lambda arg: celtypes.ListType(ast.literal_eval(arg))),
+    "map":
+        cast(Callable[..., celtypes.Value], lambda arg: celtypes.MapType(ast.literal_eval(arg))),
+    "null_type":
+        cast(Callable[..., celtypes.Value], lambda arg: None),
+    "single_duration":
+        celtypes.DurationType,
+    "single_timestamp":
+        celtypes.TimestampType,
+
+    "int64_value": celtypes.IntType,
+    "uint64_value": celtypes.UintType,
+    "double_value": celtypes.DoubleType,
+    "bool_value": celtypes.BoolType,
+    "string_value": celtypes.StringType,
+    "bytes_value": celtypes.BytesType,
+    "number_value": celtypes.DoubleType,  # Ambiguous; can somtimes be integer.
+    "null_value": cast(Callable[..., celtypes.Value], lambda arg: None),
+}
+
+
+def arg_type_value(text: str) -> Tuple[str, Annotation, celtypes.Value]:
+    """
+    Decompose ``-a name:type=value`` argument into a useful triple.
+
+    Also accept ``-a name:type``. This will find ``name`` in the environment and convert to the
+    requested type.
+
+    Also accepts ``-a name``. This will find ``name`` in the environment and treat it as a string.
+
+    Currently, names do not reflect package naming. An environment can be a package,
+    and the activation can include variables that are also part of the package.
+    This is not supported via the CLI.
+
+    Types can be celtypes class names or TYPE_NAME or PROTOBUF_TYPE
+
+    ::
+
+        TYPE_NAME : "int64_value" | "null_value" | "uint64_value" | "double_value"
+        | "bool_value" | "string_value" | "bytes_value" | "number_value"
+
+        PROTOBUF_TYPE : "single_int64" | "single_int32" | "single_uint64" | "single_uint32"
+        | "single_sint64" | "single_sint32" | "single_fixed64" | "single_fixed32"
+        | "single_sfixed32" | "single_sfixed64" | "single_float" | "single_double"
+        | "single_bool" | "single_string" | "single_bytes"
+        | "single_duration" | "single_timestamp"
+
+    ..  todo:: type names can include `.` to support namespacing for protobuf support.
+
+    :param text: Argument value
+    :return: Tuple with name, annotation, and resulting object.
+    """
+    arg_pattern = re.compile(
+        r"^([_a-zA-Z][_a-zA-Z0-9]*)(?::([_a-zA-Z][_a-zA-Z0-9]*))?(?:=(.*))?$"
+    )
+    match = arg_pattern.match(text)
+    if match is None:
+        raise argparse.ArgumentTypeError(
+            f"arg {text} not 'var=string', 'var:type=value', or `var:type")
+    name, type_name, value_text = match.groups()
+    if value_text is None:
+        value_text = os.environ.get(name)
+    type_definition: Annotation  # CELType or a conversion function
+    value: celtypes.Value  # Specific value.
+    if type_name:
+        try:
+            type_definition = CLI_ARG_TYPES[type_name]
+            value = cast(
+                celtypes.Value,
+                type_definition(value_text)  # type: ignore[arg-type, call-arg]
+            )
+        except KeyError:
+            raise argparse.ArgumentTypeError(f"arg {text} type name not in {list(CLI_ARG_TYPES)}")
+        except ValueError:
+            raise argparse.ArgumentTypeError(f"arg {text} value invalid for the supplied type")
+    else:
+        value = celtypes.StringType(value_text or "")
+        type_definition = celtypes.StringType
+    return name, type_definition, value
+
+
+def get_options(argv: Optional[List[str]] = None) -> argparse.Namespace:
+    """Parses command-line arguments.
+    """
+    parser = argparse.ArgumentParser(prog="celpy", description="Pure Python CEL")
+    parser.add_argument(
+        "-v", "--verbose", default=0, action='count')
+
+    # Inputs
+    parser.add_argument(
+        "-a", "--arg", action='append', type=arg_type_value,
+        help="Variables to set; -a name:type=value, or -a name=value for strings, "
+             "or -a name to read an environment variable"
+    )
+    parser.add_argument(
+        "-n", "--null-input", dest='null_input', default=False, action='store_true',
+        help="Avoid reading Newline-Delimited JSON documents from stdin"
+    )
+    parser.add_argument(
+        "-s", "--slurp", default=False, action="store_true",
+        help="Slurp a single, multiple JSON document from stdin"
+    )
+    parser.add_argument(
+        "-i", "--interactive", default=False, action="store_true",
+        help="Interactive REPL"
+    )
+
+    # JSON handling
+    parser.add_argument(
+        "--json-package", "-p", metavar="NAME", dest="package", default=None, action="store",
+        help="Each JSON input is a CEL package, allowing .name to work"
+    )
+    parser.add_argument(
+        "--json-document", "-d", metavar="NAME", dest="document", default=None, action="store",
+        help="Each JSON input is a variable, allowing name.map(x, x*2) to work"
+    )
+
+    # Outputs and Status
+    parser.add_argument(
+        "-b", "--boolean", default=False, action='store_true',
+        help="If the result is True, the exit status is 0, for False, it's 1, otherwise 2"
+    )
+    parser.add_argument(
+        "-f", "--format", default=None, action='store',
+        help=(
+            "Use Python formating instead of JSON conversion of results; "
+            "Example '.6f' to format a DoubleType result"
+        )
+    )
+
+    # The expression
+    parser.add_argument("expr", nargs='?')
+
+    options = parser.parse_args(argv)
+    if options.package and options.document:
+        parser.error("Either use --json-package or --json-document, not both")
+    if not options.package and not options.document:
+        options.package = "jq"
+    if options.interactive and options.expr:
+        parser.error("Interactive mode and an expression provided")
+    if not options.interactive and not options.expr:
+        parser.error("No expression provided")
+    return options
+
+
+class CEL_REPL(cmd.Cmd):
+    prompt = "CEL> "
+    intro = "Enter an expression to have it evaluated."
+    logger = logging.getLogger("REPL")
+
+    def cel_eval(self, text: str) -> celtypes.Value:
+        try:
+            expr = self.env.compile(text)
+            prgm = self.env.program(expr)
+            return prgm.evaluate(self.state)
+        except CELParseError as ex:
+            print(self.env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+            raise
+
+    def preloop(self) -> None:
+        self.env = Environment()
+        self.state: Dict[str, celtypes.Value] = {}
+
+    def do_set(self, args: str) -> bool:
+        """Set variable expression
+
+        Evaluates the expression, saves the result as the given variable in the current activation.
+        """
+        name, space, args = args.partition(' ')
+        try:
+            value: celtypes.Value = self.cel_eval(args)
+            print(value)
+            self.state[name] = value
+        except Exception as ex:
+            self.logger.error(ex)
+        return False
+
+    def do_show(self, args: str) -> bool:
+        """Shows all variables in the current activation."""
+        print(self.state)
+        return False
+
+    def do_quit(self, args: str) -> bool:
+        """Quits from the REPL."""
+        return True
+
+    do_exit = do_quit
+    do_bye = do_quit
+
+    def default(self, args: str) -> None:
+        """Evaluate an expression."""
+        try:
+            value = self.cel_eval(args)
+            print(value)
+        except Exception as ex:
+            self.logger.error(ex)
+
+
+def process_json_doc(
+        display: Callable[[Result], None],
+        prgm: Runner,
+        activation: Dict[str, Any],
+        variable: str,
+        document: str,
+        boolean_to_status: bool = False) -> int:
+    """
+    Process a single JSON document. Either one line of an NDJSON stream
+    or the only document in slurp mode. We assign it to the variable "jq".
+    This variable can be the package name, allowing ``.name``) to work.
+    Or. It can be left as a variable, allowing ``jq`` and ``jq.map(x, x*2)`` to work.
+
+    Returns status code 0 for success, 3 for failure.
+    """
+    try:
+        activation[variable] = json.loads(document, cls=CELJSONDecoder)
+        result = prgm.evaluate(activation)
+        display(result)
+        if (boolean_to_status and isinstance(result, (celtypes.BoolType, bool))):
+            return 0 if result else 1
+        return 0
+    except CELEvalError as ex:
+        # ``jq`` KeyError problems result in ``None``.
+        # Other errors should, perhaps, be more visible.
+        logger.debug("Encountered %s on document %r", ex, document)
+        display(None)
+        return 0
+    except json.decoder.JSONDecodeError as ex:
+        logger.error("%s on document %r", ex.args[0], document)
+        # print(f"{ex.args[0]} in {document!r}", file=sys.stderr)
+        return 3
+
+
+def main(argv: Optional[List[str]] = None) -> int:
+    """
+    Given options from the command-line, execute the CEL expression.
+
+    With --null-input option, only --arg and expr matter.
+
+    Without --null-input, JSON documents are read from STDIN, following ndjson format.
+
+    With the --slurp option, it reads one JSON from stdin, spread over multiple lines.
+
+    If "--json-package" is used, each JSON document becomes a package, and
+        top-level dictionary keys become valid ``.name`` expressions.
+        Otherwise, "--json-object" is the default, and each JSON document
+        is assigned to a variable. The default name is "jq" to allow expressions
+        that are similar to ``jq`` but with a "jq" prefix.
+    """
+    options = get_options(argv)
+    if options.verbose == 1:
+        logging.getLogger().setLevel(logging.INFO)
+    elif options.verbose > 1:
+        logging.getLogger().setLevel(logging.DEBUG)
+    logger.debug(options)
+
+    if options.interactive:
+        repl = CEL_REPL()
+        repl.cmdloop()
+        return 0
+
+    if options.format:
+        def output_display(result: Result) -> None:
+            print('{0:{format}}'.format(result, format=options.format))
+    else:
+        def output_display(result: Result) -> None:
+            print(json.dumps(result, cls=CELJSONEncoder))
+
+    logger.info("Expr: %r", options.expr)
+
+    if options.arg:
+        logger.info("Args: %r", options.arg)
+
+    annotations: Optional[Dict[str, Annotation]]
+    if options.arg:
+        annotations = {
+            name: type for name, type, value in options.arg
+        }
+    else:
+        annotations = None
+
+    # If we're creating a named JSON document, we don't provide a default package.
+    # If we're usinga  JSON document to populate a package, we provide the given name.
+    env = Environment(
+        package=None if options.null_input else options.package,
+        annotations=annotations,
+    )
+    try:
+        expr = env.compile(options.expr)
+        prgm = env.program(expr)
+    except CELParseError as ex:
+        print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+        return 1
+
+    if options.arg:
+        activation = {
+            name: value for name, type, value in options.arg
+        }
+    else:
+        activation = {}
+
+    if options.null_input:
+        # Don't read stdin, evaluate with only the activation context.
+        try:
+            result = prgm.evaluate(activation)
+            if options.boolean:
+                if isinstance(result, (celtypes.BoolType, bool)):
+                    summary = 0 if result else 1
+                else:
+                    logger.warning("Expected celtypes.BoolType, got %s = %r", type(result), result)
+                    summary = 2
+            else:
+                output_display(result)
+                summary = 0
+        except CELEvalError as ex:
+            print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+            summary = 2
+
+    elif options.slurp:
+        # If slurp, one big document, part of the "jq" package in the activation context.
+        document = sys.stdin.read()
+        summary = process_json_doc(
+            output_display, prgm, activation, options.document or options.package, document,
+            options.boolean
+        )
+
+    else:
+        # NDJSON format: each line is a JSON doc. We repackage the doc into celtypes objects.
+        # Each document is in the "jq" package in the activation context.
+        summary = 0
+        for document in sys.stdin:
+            summary = max(
+                summary,
+                process_json_doc(
+                    output_display, prgm, activation, options.document or options.package, document,
+                    options.boolean
+                )
+            )
+
+    return summary
+
+
+if __name__ == "__main__":  # pragma: no cover
+    logging.basicConfig(level=logging.WARNING)
+    exit = main(sys.argv[1:])
+    logging.shutdown()
+    sys.exit(exit)