aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/celpy/__main__.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/celpy/__main__.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy/__main__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/__main__.py465
1 files changed, 465 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/__main__.py b/.venv/lib/python3.12/site-packages/celpy/__main__.py
new file mode 100644
index 00000000..c9483173
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/__main__.py
@@ -0,0 +1,465 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+Pure Python implementation of CEL.
+
+This provides a few jq-like, bc-like, and shell expr-like features.
+
+- ``jq`` uses ``.`` to refer the current document. By setting a package
+ name of ``"jq"`` and placing the JSON object in the package, we achieve
+ similar syntax.
+
+- ``bc`` offers complex function definitions and other programming support.
+ CEL can only evaluate a few bc-like expressions.
+
+- This does everything ``expr`` does, but the syntax is slightly different.
+ The output of comparisons -- by default -- is boolean, where ``expr`` is an integer 1 or 0.
+ Use ``-f 'd'`` to see decimal output instead of Boolean text values.
+
+- This does some of what ``test`` does, without a lot of the sophisticated
+ file system data gathering.
+ Use ``-b`` to set the exit status code from a Boolean result.
+
+TODO: This can also have a REPL, as well as process CSV files.
+
+SYNOPSIS
+========
+
+::
+
+ python -m celpy [--arg name:type=value ...] [--null-input] expr
+
+Options:
+
+:--arg:
+ Provides argument names, types and optional values.
+ If the value is not provided, the name is expected to be an environment
+ variable, and the value of the environment variable is converted and used.
+
+:--null-input:
+ Normally, JSON documents are read from stdin in ndjson format. If no JSON documents are
+ provided, the ``--null-input`` option skips trying to read from stdin.
+
+:expr:
+ A CEL expression to evaluate.
+
+JSON documents are read from stdin in NDJSON format (http://jsonlines.org/, http://ndjson.org/).
+For each JSON document, the expression is evaluated with the document in a default
+package. This allows `.name` to pick items from the document.
+
+By default, the output is JSON serialized. This means strings will be JSON-ified and have quotes.
+
+If a ``--format`` option is provided, this is applied to the resulting object; this can be
+used to strip quotes, or limit precision on double objects, or convert numbers to hexadecimal.
+
+Arguments, Types, and Namespaces
+================================
+
+CEL objects rely on the celtypes definitions.
+
+Because of the close association between CEL and protobuf, some well-known protobuf types
+are also supported.
+
+.. todo:: CLI type environment
+
+ Permit name.name:type=value to create namespace bindings.
+
+Further, type providers can be bound to CEL. This means an extended CEL
+may have additional types beyond those defined by the :py:class:`Activation` class.
+
+"""
+
+import argparse
+import ast
+import cmd
+import json
+import logging
+import os
+import re
+import sys
+from typing import Any, Callable, Dict, List, Optional, Tuple, cast
+
+from celpy import Environment, Runner, celtypes
+from celpy.adapter import CELJSONDecoder, CELJSONEncoder
+from celpy.celparser import CELParseError
+from celpy.evaluation import Annotation, CELEvalError, Result
+
+logger = logging.getLogger("celpy")
+
+
+# For argument parsing purposes.
+# Note the reliance on `ast.literal_eval` for ListType and MapType conversions.
+# Other types convert strings directly. These types need some help.
+CLI_ARG_TYPES: Dict[str, Annotation] = {
+ "int":
+ celtypes.IntType,
+ "uint":
+ celtypes.UintType,
+ "double":
+ celtypes.DoubleType,
+ "bool":
+ celtypes.BoolType,
+ "string":
+ celtypes.StringType,
+ "bytes":
+ celtypes.BytesType,
+ "list":
+ cast(Callable[..., celtypes.Value], lambda arg: celtypes.ListType(ast.literal_eval(arg))),
+ "map":
+ cast(Callable[..., celtypes.Value], lambda arg: celtypes.MapType(ast.literal_eval(arg))),
+ "null_type":
+ cast(Callable[..., celtypes.Value], lambda arg: None),
+ "single_duration":
+ celtypes.DurationType,
+ "single_timestamp":
+ celtypes.TimestampType,
+
+ "int64_value": celtypes.IntType,
+ "uint64_value": celtypes.UintType,
+ "double_value": celtypes.DoubleType,
+ "bool_value": celtypes.BoolType,
+ "string_value": celtypes.StringType,
+ "bytes_value": celtypes.BytesType,
+ "number_value": celtypes.DoubleType, # Ambiguous; can somtimes be integer.
+ "null_value": cast(Callable[..., celtypes.Value], lambda arg: None),
+}
+
+
+def arg_type_value(text: str) -> Tuple[str, Annotation, celtypes.Value]:
+ """
+ Decompose ``-a name:type=value`` argument into a useful triple.
+
+ Also accept ``-a name:type``. This will find ``name`` in the environment and convert to the
+ requested type.
+
+ Also accepts ``-a name``. This will find ``name`` in the environment and treat it as a string.
+
+ Currently, names do not reflect package naming. An environment can be a package,
+ and the activation can include variables that are also part of the package.
+ This is not supported via the CLI.
+
+ Types can be celtypes class names or TYPE_NAME or PROTOBUF_TYPE
+
+ ::
+
+ TYPE_NAME : "int64_value" | "null_value" | "uint64_value" | "double_value"
+ | "bool_value" | "string_value" | "bytes_value" | "number_value"
+
+ PROTOBUF_TYPE : "single_int64" | "single_int32" | "single_uint64" | "single_uint32"
+ | "single_sint64" | "single_sint32" | "single_fixed64" | "single_fixed32"
+ | "single_sfixed32" | "single_sfixed64" | "single_float" | "single_double"
+ | "single_bool" | "single_string" | "single_bytes"
+ | "single_duration" | "single_timestamp"
+
+ .. todo:: type names can include `.` to support namespacing for protobuf support.
+
+ :param text: Argument value
+ :return: Tuple with name, annotation, and resulting object.
+ """
+ arg_pattern = re.compile(
+ r"^([_a-zA-Z][_a-zA-Z0-9]*)(?::([_a-zA-Z][_a-zA-Z0-9]*))?(?:=(.*))?$"
+ )
+ match = arg_pattern.match(text)
+ if match is None:
+ raise argparse.ArgumentTypeError(
+ f"arg {text} not 'var=string', 'var:type=value', or `var:type")
+ name, type_name, value_text = match.groups()
+ if value_text is None:
+ value_text = os.environ.get(name)
+ type_definition: Annotation # CELType or a conversion function
+ value: celtypes.Value # Specific value.
+ if type_name:
+ try:
+ type_definition = CLI_ARG_TYPES[type_name]
+ value = cast(
+ celtypes.Value,
+ type_definition(value_text) # type: ignore[arg-type, call-arg]
+ )
+ except KeyError:
+ raise argparse.ArgumentTypeError(f"arg {text} type name not in {list(CLI_ARG_TYPES)}")
+ except ValueError:
+ raise argparse.ArgumentTypeError(f"arg {text} value invalid for the supplied type")
+ else:
+ value = celtypes.StringType(value_text or "")
+ type_definition = celtypes.StringType
+ return name, type_definition, value
+
+
+def get_options(argv: Optional[List[str]] = None) -> argparse.Namespace:
+ """Parses command-line arguments.
+ """
+ parser = argparse.ArgumentParser(prog="celpy", description="Pure Python CEL")
+ parser.add_argument(
+ "-v", "--verbose", default=0, action='count')
+
+ # Inputs
+ parser.add_argument(
+ "-a", "--arg", action='append', type=arg_type_value,
+ help="Variables to set; -a name:type=value, or -a name=value for strings, "
+ "or -a name to read an environment variable"
+ )
+ parser.add_argument(
+ "-n", "--null-input", dest='null_input', default=False, action='store_true',
+ help="Avoid reading Newline-Delimited JSON documents from stdin"
+ )
+ parser.add_argument(
+ "-s", "--slurp", default=False, action="store_true",
+ help="Slurp a single, multiple JSON document from stdin"
+ )
+ parser.add_argument(
+ "-i", "--interactive", default=False, action="store_true",
+ help="Interactive REPL"
+ )
+
+ # JSON handling
+ parser.add_argument(
+ "--json-package", "-p", metavar="NAME", dest="package", default=None, action="store",
+ help="Each JSON input is a CEL package, allowing .name to work"
+ )
+ parser.add_argument(
+ "--json-document", "-d", metavar="NAME", dest="document", default=None, action="store",
+ help="Each JSON input is a variable, allowing name.map(x, x*2) to work"
+ )
+
+ # Outputs and Status
+ parser.add_argument(
+ "-b", "--boolean", default=False, action='store_true',
+ help="If the result is True, the exit status is 0, for False, it's 1, otherwise 2"
+ )
+ parser.add_argument(
+ "-f", "--format", default=None, action='store',
+ help=(
+ "Use Python formating instead of JSON conversion of results; "
+ "Example '.6f' to format a DoubleType result"
+ )
+ )
+
+ # The expression
+ parser.add_argument("expr", nargs='?')
+
+ options = parser.parse_args(argv)
+ if options.package and options.document:
+ parser.error("Either use --json-package or --json-document, not both")
+ if not options.package and not options.document:
+ options.package = "jq"
+ if options.interactive and options.expr:
+ parser.error("Interactive mode and an expression provided")
+ if not options.interactive and not options.expr:
+ parser.error("No expression provided")
+ return options
+
+
+class CEL_REPL(cmd.Cmd):
+ prompt = "CEL> "
+ intro = "Enter an expression to have it evaluated."
+ logger = logging.getLogger("REPL")
+
+ def cel_eval(self, text: str) -> celtypes.Value:
+ try:
+ expr = self.env.compile(text)
+ prgm = self.env.program(expr)
+ return prgm.evaluate(self.state)
+ except CELParseError as ex:
+ print(self.env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ raise
+
+ def preloop(self) -> None:
+ self.env = Environment()
+ self.state: Dict[str, celtypes.Value] = {}
+
+ def do_set(self, args: str) -> bool:
+ """Set variable expression
+
+ Evaluates the expression, saves the result as the given variable in the current activation.
+ """
+ name, space, args = args.partition(' ')
+ try:
+ value: celtypes.Value = self.cel_eval(args)
+ print(value)
+ self.state[name] = value
+ except Exception as ex:
+ self.logger.error(ex)
+ return False
+
+ def do_show(self, args: str) -> bool:
+ """Shows all variables in the current activation."""
+ print(self.state)
+ return False
+
+ def do_quit(self, args: str) -> bool:
+ """Quits from the REPL."""
+ return True
+
+ do_exit = do_quit
+ do_bye = do_quit
+
+ def default(self, args: str) -> None:
+ """Evaluate an expression."""
+ try:
+ value = self.cel_eval(args)
+ print(value)
+ except Exception as ex:
+ self.logger.error(ex)
+
+
+def process_json_doc(
+ display: Callable[[Result], None],
+ prgm: Runner,
+ activation: Dict[str, Any],
+ variable: str,
+ document: str,
+ boolean_to_status: bool = False) -> int:
+ """
+ Process a single JSON document. Either one line of an NDJSON stream
+ or the only document in slurp mode. We assign it to the variable "jq".
+ This variable can be the package name, allowing ``.name``) to work.
+ Or. It can be left as a variable, allowing ``jq`` and ``jq.map(x, x*2)`` to work.
+
+ Returns status code 0 for success, 3 for failure.
+ """
+ try:
+ activation[variable] = json.loads(document, cls=CELJSONDecoder)
+ result = prgm.evaluate(activation)
+ display(result)
+ if (boolean_to_status and isinstance(result, (celtypes.BoolType, bool))):
+ return 0 if result else 1
+ return 0
+ except CELEvalError as ex:
+ # ``jq`` KeyError problems result in ``None``.
+ # Other errors should, perhaps, be more visible.
+ logger.debug("Encountered %s on document %r", ex, document)
+ display(None)
+ return 0
+ except json.decoder.JSONDecodeError as ex:
+ logger.error("%s on document %r", ex.args[0], document)
+ # print(f"{ex.args[0]} in {document!r}", file=sys.stderr)
+ return 3
+
+
+def main(argv: Optional[List[str]] = None) -> int:
+ """
+ Given options from the command-line, execute the CEL expression.
+
+ With --null-input option, only --arg and expr matter.
+
+ Without --null-input, JSON documents are read from STDIN, following ndjson format.
+
+ With the --slurp option, it reads one JSON from stdin, spread over multiple lines.
+
+ If "--json-package" is used, each JSON document becomes a package, and
+ top-level dictionary keys become valid ``.name`` expressions.
+ Otherwise, "--json-object" is the default, and each JSON document
+ is assigned to a variable. The default name is "jq" to allow expressions
+ that are similar to ``jq`` but with a "jq" prefix.
+ """
+ options = get_options(argv)
+ if options.verbose == 1:
+ logging.getLogger().setLevel(logging.INFO)
+ elif options.verbose > 1:
+ logging.getLogger().setLevel(logging.DEBUG)
+ logger.debug(options)
+
+ if options.interactive:
+ repl = CEL_REPL()
+ repl.cmdloop()
+ return 0
+
+ if options.format:
+ def output_display(result: Result) -> None:
+ print('{0:{format}}'.format(result, format=options.format))
+ else:
+ def output_display(result: Result) -> None:
+ print(json.dumps(result, cls=CELJSONEncoder))
+
+ logger.info("Expr: %r", options.expr)
+
+ if options.arg:
+ logger.info("Args: %r", options.arg)
+
+ annotations: Optional[Dict[str, Annotation]]
+ if options.arg:
+ annotations = {
+ name: type for name, type, value in options.arg
+ }
+ else:
+ annotations = None
+
+ # If we're creating a named JSON document, we don't provide a default package.
+ # If we're usinga JSON document to populate a package, we provide the given name.
+ env = Environment(
+ package=None if options.null_input else options.package,
+ annotations=annotations,
+ )
+ try:
+ expr = env.compile(options.expr)
+ prgm = env.program(expr)
+ except CELParseError as ex:
+ print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ return 1
+
+ if options.arg:
+ activation = {
+ name: value for name, type, value in options.arg
+ }
+ else:
+ activation = {}
+
+ if options.null_input:
+ # Don't read stdin, evaluate with only the activation context.
+ try:
+ result = prgm.evaluate(activation)
+ if options.boolean:
+ if isinstance(result, (celtypes.BoolType, bool)):
+ summary = 0 if result else 1
+ else:
+ logger.warning("Expected celtypes.BoolType, got %s = %r", type(result), result)
+ summary = 2
+ else:
+ output_display(result)
+ summary = 0
+ except CELEvalError as ex:
+ print(env.cel_parser.error_text(ex.args[0], ex.line, ex.column), file=sys.stderr)
+ summary = 2
+
+ elif options.slurp:
+ # If slurp, one big document, part of the "jq" package in the activation context.
+ document = sys.stdin.read()
+ summary = process_json_doc(
+ output_display, prgm, activation, options.document or options.package, document,
+ options.boolean
+ )
+
+ else:
+ # NDJSON format: each line is a JSON doc. We repackage the doc into celtypes objects.
+ # Each document is in the "jq" package in the activation context.
+ summary = 0
+ for document in sys.stdin:
+ summary = max(
+ summary,
+ process_json_doc(
+ output_display, prgm, activation, options.document or options.package, document,
+ options.boolean
+ )
+ )
+
+ return summary
+
+
+if __name__ == "__main__": # pragma: no cover
+ logging.basicConfig(level=logging.WARNING)
+ exit = main(sys.argv[1:])
+ logging.shutdown()
+ sys.exit(exit)