diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/celpy/celtypes.py | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy/celtypes.py')
| -rw-r--r-- | .venv/lib/python3.12/site-packages/celpy/celtypes.py | 1495 |
1 files changed, 1495 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/celtypes.py b/.venv/lib/python3.12/site-packages/celpy/celtypes.py new file mode 100644 index 00000000..3c4513ac --- /dev/null +++ b/.venv/lib/python3.12/site-packages/celpy/celtypes.py @@ -0,0 +1,1495 @@ +# SPDX-Copyright: Copyright (c) Capital One Services, LLC +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2020 Capital One Services, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +""" +CEL Types: wrappers on Python types to provide CEL semantics. + +This can be used by a Python module to work with CEL-friendly values and CEL results. + +Examples of distinctions between CEL and Python: + +- Unlike Python ``bool``, CEL :py:class:`BoolType` won't do some math. + +- CEL has ``int64`` and ``uint64`` subclasses of integer. These have specific ranges and + raise :exc:`ValueError` errors on overflow. + +CEL types will raise :exc:`ValueError` for out-of-range values and :exc:`TypeError` +for operations they refuse. +The :py:mod:`evaluation` module can capture these exceptions and turn them into result values. +This can permit the logic operators to quietly silence them via "short-circuiting". + +In the normal course of events, CEL's evaluator may attempt operations between a +CEL exception result and an instance of one of CEL types. +We rely on this leading to an ordinary Python :exc:`TypeError` to be raised to propogate +the error. Or. A logic operator may discard the error object. + +The :py:mod:`evaluation` module extends these types with it's own :exc:`CELEvalError` exception. +We try to keep that as a separate concern from the core operator implementations here. +We leverage Python features, which means raising exceptions when there is a problem. + +Types +============= + +See https://github.com/google/cel-go/tree/master/common/types + +These are the Go type definitions that are used by CEL: + +- BoolType +- BytesType +- DoubleType +- DurationType +- IntType +- ListType +- MapType +- NullType +- StringType +- TimestampType +- TypeType +- UintType + +The above types are handled directly byt CEL syntax. +e.g., ``42`` vs. ``42u`` vs. ``"42"`` vs. ``b"42"`` vs. ``42.``. + +We provide matching Python class names for each of these types. The Python type names +are subclasses of Python native types, allowing a client to transparently work with +CEL results. A Python host should be able to provide values to CEL that will be tolerated. + +A type hint of ``Value`` unifies these into a common hint. + +The CEL Go implementation also supports protobuf types: + +- dpb.Duration +- tpb.Timestamp +- structpb.ListValue +- structpb.NullValue +- structpb.Struct +- structpb.Value +- wrapperspb.BoolValue +- wrapperspb.BytesValue +- wrapperspb.DoubleValue +- wrapperspb.FloatValue +- wrapperspb.Int32Value +- wrapperspb.Int64Value +- wrapperspb.StringValue +- wrapperspb.UInt32Value +- wrapperspb.UInt64Value + +These types involve expressions like the following:: + + google.protobuf.UInt32Value{value: 123u} + +In this case, the well-known protobuf name is directly visible as CEL syntax. +There's a ``google`` package with the needed definitions. + +Type Provider +============================== + +A type provider can be bound to the environment, this will support additional types. +This appears to be a factory to map names of types to type classes. + +Run-time type binding is shown by a CEL expression like the following:: + + TestAllTypes{single_uint32_wrapper: 432u} + +The ``TestAllTypes`` is a protobuf type added to the CEL run-time. The syntax +is defined by this syntax rule:: + + member_object : member "{" [fieldinits] "}" + +The ``member`` is part of a type provider library, +either a standard protobuf definition or an extension. The field inits build +values for the protobuf object. + +See https://github.com/google/cel-go/blob/master/test/proto3pb/test_all_types.proto +for the ``TestAllTypes`` protobuf definition that is registered as a type provider. + +This expression will describes a Protobuf ``uint32`` object. + +Type Adapter +============= + +So far, it appears that a type adapter wraps existing Go or C++ types +with CEL-required methods. This seems like it does not need to be implemented +in Python. + +Numeric Details +=============== + +Integer division truncates toward zero. + +The Go definition of modulus:: + + // Mod returns the floating-point remainder of x/y. + // The magnitude of the result is less than y and its + // sign agrees with that of x. + +https://golang.org/ref/spec#Arithmetic_operators + +"Go has the nice property that -a/b == -(a/b)." + +:: + + x y x / y x % y + 5 3 1 2 + -5 3 -1 -2 + 5 -3 -1 2 + -5 -3 1 -2 + +Python definition:: + + The modulo operator always yields a result + with the same sign as its second operand (or zero); + the absolute value of the result is strictly smaller than + the absolute value of the second operand. + +Here's the essential rule:: + + x//y * y + x%y == x + +However. Python ``//`` truncates toward negative infinity. Go ``/`` truncates toward zero. + +To get Go-like behavior, we need to use absolute values and restore the signs later. + +:: + + x_sign = -1 if x < 0 else +1 + go_mod = x_sign * (abs(x) % abs(y)) + return go_mod + +Timzone Details +=============== + +An implementation may have additional timezone names that must be injected into +th dateutil.gettz() processing. + +For example, there may be the following sequence: + +1. A lowercase match for an alias or an existing dateutil timezone. + +2. A titlecase match for an existing dateutil timezone. + +3. The fallback, which is a +/-HH:MM string. + +.. TODO: Permit an extension into the timezone lookup. + +""" +import datetime +import logging +import re +from functools import reduce, wraps +from math import fsum +from typing import (Any, Callable, Dict, Iterable, List, Mapping, NoReturn, + Optional, Sequence, Tuple, Type, TypeVar, Union, cast, + overload) + +import dateutil.parser +import dateutil.tz + +logger = logging.getLogger("celtypes") + + +Value = Union[ + 'BoolType', + 'BytesType', + 'DoubleType', + 'DurationType', + 'IntType', + 'ListType', + 'MapType', + None, # Used instead of NullType + 'StringType', + 'TimestampType', + 'UintType', +] + +# The domain of types used to build Annotations. +CELType = Union[ + Type['BoolType'], + Type['BytesType'], + Type['DoubleType'], + Type['DurationType'], + Type['IntType'], + Type['ListType'], + Type['MapType'], + Callable[..., None], # Used instead of NullType + Type['StringType'], + Type['TimestampType'], + Type['TypeType'], # Used to mark Protobuf Type values + Type['UintType'], + Type['PackageType'], + Type['MessageType'], +] + + +def type_matched(method: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]: + """Decorates a method to assure the "other" value has the same type.""" + @wraps(method) + def type_matching_method(self: Any, other: Any) -> Any: + if not(issubclass(type(other), type(self)) or issubclass(type(self), type(other))): + raise TypeError(f"no such overload: {self!r} {type(self)} != {other!r} {type(other)}") + return method(self, other) + return type_matching_method + + +def logical_condition(e: Value, x: Value, y: Value) -> Value: + """ + CEL e ? x : y operator. + Choose one of x or y. Exceptions in the unchosen expression are ignored. + + Example:: + + 2 / 0 > 4 ? 'baz' : 'quux' + + is a "division by zero" error. + + :: + + >>> logical_condition( + ... BoolType(True), StringType("this"), StringType("Not That")) + StringType('this') + >>> logical_condition( + ... BoolType(False), StringType("Not This"), StringType("that")) + StringType('that') + """ + if not isinstance(e, BoolType): + raise TypeError(f"Unexpected {type(e)} ? {type(x)} : {type(y)}") + result = x if e else y + logger.debug("logical_condition(%r, %r, %r) = %r", e, x, y, result) + return result + + +def logical_and(x: Value, y: Value) -> Value: + """ + Native Python has a left-to-right rule. + CEL && is commutative with non-Boolean values, including error objects. + """ + if not isinstance(x, BoolType) and not isinstance(y, BoolType): + raise TypeError(f"{type(x)} {x!r} and {type(y)} {y!r}") + elif not isinstance(x, BoolType) and isinstance(y, BoolType): + if y: + return x # whatever && true == whatever + else: + return y # whatever && false == false + elif isinstance(x, BoolType) and not isinstance(y, BoolType): + if x: + return y # true && whatever == whatever + else: + return x # false && whatever == false + else: + return BoolType(cast(BoolType, x) and cast(BoolType, y)) + + +def logical_not(x: Value) -> Value: + """ + Native python `not` isn't fully exposed for CEL types. + """ + if isinstance(x, BoolType): + result = BoolType(not x) + else: + raise TypeError(f"not {type(x)}") + logger.debug("logical_not(%r) = %r", x, result) + return result + + +def logical_or(x: Value, y: Value) -> Value: + """ + Native Python has a left-to-right rule: (True or y) is True, (False or y) is y. + CEL || is commutative with non-Boolean values, including errors. + ``(x || false)`` is ``x``, and ``(false || y)`` is ``y``. + + Example 1:: + + false || 1/0 != 0 + + is a "no matching overload" error. + + Example 2:: + + (2 / 0 > 3 ? false : true) || true + + is a "True" + + If the operand(s) are not BoolType, we'll create an TypeError that will become a CELEvalError. + """ + if not isinstance(x, BoolType) and not isinstance(y, BoolType): + raise TypeError(f"{type(x)} {x!r} or {type(y)} {y!r}") + elif not isinstance(x, BoolType) and isinstance(y, BoolType): + if y: + return y # whatever || true == true + else: + return x # whatever || false == whatever + elif isinstance(x, BoolType) and not isinstance(y, BoolType): + if x: + return x # true || whatever == true + else: + return y # false || whatever == whatever + else: + return BoolType(cast(BoolType, x) or cast(BoolType, y)) + + +class BoolType(int): + """ + Native Python permits unary operators on Booleans. + + For CEL, We need to prevent -false from working. + """ + def __new__(cls: Type['BoolType'], source: Any) -> 'BoolType': + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, BoolType): + return source + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(int, source.get(StringType("value"))) + ) + else: + return super().__new__(cls, source) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({bool(self)})" + + def __str__(self) -> str: + return str(bool(self)) + + def __neg__(self) -> NoReturn: + raise TypeError("no such overload") + + def __hash__(self) -> int: + return super().__hash__() + + +class BytesType(bytes): + """Python's bytes semantics are close to CEL.""" + def __new__( + cls: Type['BytesType'], + source: Union[str, bytes, Iterable[int], 'BytesType', 'StringType'], + *args: Any, + **kwargs: Any + ) -> 'BytesType': + if source is None: + return super().__new__(cls, b'') + elif isinstance(source, (bytes, BytesType)): + return super().__new__(cls, source) + elif isinstance(source, (str, StringType)): + return super().__new__(cls, source.encode('utf-8')) + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(bytes, source.get(StringType("value"))) # type: ignore [attr-defined] + ) + elif isinstance(source, Iterable): + return super().__new__( + cls, + cast(Iterable[int], source) + ) + else: + raise TypeError(f"Invalid initial value type: {type(source)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + +class DoubleType(float): + """ + Native Python permits mixed type comparisons, doing conversions as needed. + + For CEL, we need to prevent mixed-type comparisons from working. + + TODO: Conversions from string? IntType? UintType? DoubleType? + """ + def __new__(cls: Type['DoubleType'], source: Any) -> 'DoubleType': + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, MessageType): + return super().__new__( + cls, + cast(float, source.get(StringType("value"))) + ) + else: + return super().__new__(cls, source) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(float(self)) + return text + + def __neg__(self) -> 'DoubleType': + return DoubleType(super().__neg__()) + + def __mod__(self, other: Any) -> NoReturn: + raise TypeError( + f"found no matching overload for '_%_' applied to '(double, {type(other)})'") + + def __truediv__(self, other: Any) -> 'DoubleType': + if cast(float, other) == 0.0: + return DoubleType("inf") + else: + return DoubleType(super().__truediv__(other)) + + def __rmod__(self, other: Any) -> NoReturn: + raise TypeError( + f"found no matching overload for '_%_' applied to '({type(other)}, double)'") + + def __rtruediv__(self, other: Any) -> 'DoubleType': + if self == 0.0: + return DoubleType("inf") + else: + return DoubleType(super().__rtruediv__(other)) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +IntOperator = TypeVar('IntOperator', bound=Callable[..., int]) + + +def int64(operator: IntOperator) -> IntOperator: + """Apply an operation, but assure the value is within the int64 range.""" + @wraps(operator) + def clamped_operator(*args: Any, **kwargs: Any) -> int: + result: int = operator(*args, **kwargs) + if -(2**63) <= result < 2**63: + return result + raise ValueError("overflow") + return cast(IntOperator, clamped_operator) + + +class IntType(int): + """ + A version of int with overflow errors outside int64 range. + + features/integer_math.feature:277 "int64_overflow_positive" + + >>> IntType(9223372036854775807) + IntType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> 2**63 + 9223372036854775808 + + features/integer_math.feature:285 "int64_overflow_negative" + + >>> -IntType(9223372036854775808) - IntType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> IntType(DoubleType(1.9)) + IntType(2) + >>> IntType(DoubleType(-123.456)) + IntType(-123) + """ + def __new__( + cls: Type['IntType'], + source: Any, + *args: Any, + **kwargs: Any + ) -> 'IntType': + convert: Callable[..., int] + if source is None: + return super().__new__(cls, 0) + elif isinstance(source, IntType): + return source + elif isinstance(source, MessageType): + # Used by protobuf. + return super().__new__( + cls, + cast(int, source.get(StringType("value"))) + ) + elif isinstance(source, (float, DoubleType)): + convert = int64(round) + elif isinstance(source, TimestampType): + convert = int64(lambda src: src.timestamp()) + elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}: + convert = int64(lambda src: int(src[2:], 16)) + elif isinstance(source, (str, StringType)) and source[:3] in {"-0x", "-0X"}: + convert = int64(lambda src: -int(src[3:], 16)) + else: + # Must tolerate "-" as part of the literal. + # See https://github.com/google/cel-spec/issues/126 + convert = int64(int) + return super().__new__(cls, convert(source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(int(self)) + return text + + @int64 + def __neg__(self) -> 'IntType': + return IntType(super().__neg__()) + + @int64 + def __add__(self, other: Any) -> 'IntType': + return IntType(super().__add__(cast(IntType, other))) + + @int64 + def __sub__(self, other: Any) -> 'IntType': + return IntType(super().__sub__(cast(IntType, other))) + + @int64 + def __mul__(self, other: Any) -> 'IntType': + return IntType(super().__mul__(cast(IntType, other))) + + @int64 + def __truediv__(self, other: Any) -> 'IntType': + other = cast(IntType, other) + self_sign = -1 if self < IntType(0) else +1 + other_sign = -1 if other < IntType(0) else +1 + go_div = self_sign * other_sign * (abs(self) // abs(other)) + return IntType(go_div) + + __floordiv__ = __truediv__ + + @int64 + def __mod__(self, other: Any) -> 'IntType': + self_sign = -1 if self < IntType(0) else +1 + go_mod = self_sign * (abs(self) % abs(cast(IntType, other))) + return IntType(go_mod) + + @int64 + def __radd__(self, other: Any) -> 'IntType': + return IntType(super().__radd__(cast(IntType, other))) + + @int64 + def __rsub__(self, other: Any) -> 'IntType': + return IntType(super().__rsub__(cast(IntType, other))) + + @int64 + def __rmul__(self, other: Any) -> 'IntType': + return IntType(super().__rmul__(cast(IntType, other))) + + @int64 + def __rtruediv__(self, other: Any) -> 'IntType': + other = cast(IntType, other) + self_sign = -1 if self < IntType(0) else +1 + other_sign = -1 if other < IntType(0) else +1 + go_div = self_sign * other_sign * (abs(other) // abs(self)) + return IntType(go_div) + + __rfloordiv__ = __rtruediv__ + + @int64 + def __rmod__(self, other: Any) -> 'IntType': + left_sign = -1 if other < IntType(0) else +1 + go_mod = left_sign * (abs(other) % abs(self)) + return IntType(go_mod) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + @type_matched + def __lt__(self, other: Any) -> bool: + return super().__lt__(other) + + @type_matched + def __le__(self, other: Any) -> bool: + return super().__le__(other) + + @type_matched + def __gt__(self, other: Any) -> bool: + return super().__gt__(other) + + @type_matched + def __ge__(self, other: Any) -> bool: + return super().__ge__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +def uint64(operator: IntOperator) -> IntOperator: + """Apply an operation, but assure the value is within the uint64 range.""" + @wraps(operator) + def clamped_operator(*args: Any, **kwargs: Any) -> int: + result = operator(*args, **kwargs) + if 0 <= result < 2**64: + return result + raise ValueError("overflow") + return cast(IntOperator, clamped_operator) + + +class UintType(int): + """ + A version of int with overflow errors outside uint64 range. + + Alternatives: + + Option 1 - Use https://pypi.org/project/fixedint/ + + Option 2 - use array or struct modules to access an unsigned object. + + Test Cases: + + features/integer_math.feature:149 "unary_minus_no_overload" + + >>> -UintType(42) + Traceback (most recent call last): + ... + TypeError: no such overload + + uint64_overflow_positive + + >>> UintType(18446744073709551615) + UintType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + uint64_overflow_negative + + >>> UintType(0) - UintType(1) + Traceback (most recent call last): + ... + ValueError: overflow + + >>> - UintType(5) + Traceback (most recent call last): + ... + TypeError: no such overload + """ + def __new__( + cls: Type['UintType'], + source: Any, + *args: Any, + **kwargs: Any + ) -> 'UintType': + convert: Callable[..., int] + if isinstance(source, UintType): + return source + elif isinstance(source, (float, DoubleType)): + convert = uint64(round) + elif isinstance(source, TimestampType): + convert = uint64(lambda src: src.timestamp()) + elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}: + convert = uint64(lambda src: int(src[2:], 16)) + elif isinstance(source, MessageType): + # Used by protobuf. + convert = uint64(lambda src: src['value'] if src['value'] is not None else 0) + elif source is None: + convert = uint64(lambda src: 0) + else: + convert = uint64(int) + return super().__new__(cls, convert(source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __str__(self) -> str: + text = str(int(self)) + return text + + def __neg__(self) -> NoReturn: + raise TypeError("no such overload") + + @uint64 + def __add__(self, other: Any) -> 'UintType': + return UintType(super().__add__(cast(IntType, other))) + + @uint64 + def __sub__(self, other: Any) -> 'UintType': + return UintType(super().__sub__(cast(IntType, other))) + + @uint64 + def __mul__(self, other: Any) -> 'UintType': + return UintType(super().__mul__(cast(IntType, other))) + + @uint64 + def __truediv__(self, other: Any) -> 'UintType': + return UintType(super().__floordiv__(cast(IntType, other))) + + __floordiv__ = __truediv__ + + @uint64 + def __mod__(self, other: Any) -> 'UintType': + return UintType(super().__mod__(cast(IntType, other))) + + @uint64 + def __radd__(self, other: Any) -> 'UintType': + return UintType(super().__radd__(cast(IntType, other))) + + @uint64 + def __rsub__(self, other: Any) -> 'UintType': + return UintType(super().__rsub__(cast(IntType, other))) + + @uint64 + def __rmul__(self, other: Any) -> 'UintType': + return UintType(super().__rmul__(cast(IntType, other))) + + @uint64 + def __rtruediv__(self, other: Any) -> 'UintType': + return UintType(super().__rfloordiv__(cast(IntType, other))) + + __rfloordiv__ = __rtruediv__ + + @uint64 + def __rmod__(self, other: Any) -> 'UintType': + return UintType(super().__rmod__(cast(IntType, other))) + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +class ListType(List[Value]): + """ + Native Python implements comparison operations between list objects. + + For CEL, we prevent list comparison operators from working. + + We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that + gracefully ignore type mismatch problems, calling them not equal. + + See https://github.com/google/cel-spec/issues/127 + + An implied logical And means a singleton behaves in a distinct way from a non-singleton list. + """ + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __lt__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __le__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __gt__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __ge__(self, other: Any) -> NoReturn: + raise TypeError("no such overload") + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, (list, ListType)): + raise TypeError(f"no such overload: ListType == {type(other)}") + + def equal(s: Any, o: Any) -> Value: + try: + return BoolType(s == o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + result = ( + len(self) == len(other) + and reduce( # noqa: W503 + logical_and, # type: ignore [arg-type] + (equal(item_s, item_o) for item_s, item_o in zip(self, other)), + BoolType(True) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + def __ne__(self, other: Any) -> bool: + if not isinstance(other, (list, ListType)): + raise TypeError(f"no such overload: ListType != {type(other)}") + + def not_equal(s: Any, o: Any) -> Value: + try: + return BoolType(s != o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + result = ( + len(self) != len(other) + or reduce( # noqa: W503 + logical_or, # type: ignore [arg-type] + (not_equal(item_s, item_o) for item_s, item_o in zip(self, other)), + BoolType(False) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + +BaseMapTypes = Union[ + Mapping[Any, Any], + Sequence[Tuple[Any, Any]], + None +] + + +MapKeyTypes = Union[ + 'IntType', 'UintType', 'BoolType', 'StringType', str +] + + +class MapType(Dict[Value, Value]): + """ + Native Python allows mapping updates and any hashable type as a kay. + + CEL prevents mapping updates and has a limited domain of key types. + int, uint, bool, or string keys + + We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that + gracefully ignore type mismatch problems for the values, calling them not equal. + + See https://github.com/google/cel-spec/issues/127 + + An implied logical And means a singleton behaves in a distinct way from a non-singleton mapping. + """ + def __init__( + self, + items: BaseMapTypes = None) -> None: + super().__init__() + if items is None: + pass + elif isinstance(items, Sequence): + for name, value in items: + self[name] = value + elif isinstance(items, Mapping): + for name, value in items.items(): + self[name] = value + else: + raise TypeError(f"Invalid initial value type: {type(items)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + def __getitem__(self, key: Any) -> Any: + if not MapType.valid_key_type(key): + raise TypeError(f"unsupported key type: {type(key)}") + return super().__getitem__(key) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, (Mapping, MapType)): + raise TypeError(f"no such overload: MapType == {type(other)}") + + def equal(s: Any, o: Any) -> BoolType: + try: + return BoolType(s == o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + keys_s = self.keys() + keys_o = other.keys() + result = ( + keys_s == keys_o + and reduce( # noqa: W503 + logical_and, # type: ignore [arg-type] + (equal(self[k], other[k]) for k in keys_s), + BoolType(True) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + def __ne__(self, other: Any) -> bool: + if not isinstance(other, (Mapping, MapType)): + raise TypeError(f"no such overload: MapType != {type(other)}") + + # Singleton special case, may return no-such overload. + if len(self) == 1 and len(other) == 1 and self.keys() == other.keys(): + k = next(iter(self.keys())) + return cast(bool, self[k] != other[k]) # Instead of Union[BoolType, TypeError] + + def not_equal(s: Any, o: Any) -> BoolType: + try: + return BoolType(s != o) + except TypeError as ex: + return cast(BoolType, ex) # Instead of Union[BoolType, TypeError] + + keys_s = self.keys() + keys_o = other.keys() + result = ( + keys_s != keys_o + or reduce( # noqa: W503 + logical_or, # type: ignore [arg-type] + (not_equal(self[k], other[k]) for k in keys_s), + BoolType(False) # type: ignore [arg-type] + ) + ) + if isinstance(result, TypeError): + raise result + return bool(result) + + @staticmethod + def valid_key_type(key: Any) -> bool: + """Valid CEL key types. Plus native str for tokens in the source when evaluating ``e.f``""" + return isinstance(key, (IntType, UintType, BoolType, StringType, str)) + + +class NullType: + """TBD. May not be needed. Python's None semantics appear to match CEL perfectly.""" + pass + + +class StringType(str): + """Python's str semantics are very, very close to CEL. + + We rely on the overlap between ``"/u270c"`` and ``"/U0001f431"`` in CEL and Python. + """ + def __new__( + cls: Type['StringType'], + source: Union[str, bytes, 'BytesType', 'StringType'], + *args: Any, + **kwargs: Any + ) -> 'StringType': + if isinstance(source, (bytes, BytesType)): + return super().__new__(cls, source.decode('utf')) + elif isinstance(source, (str, StringType)): + # TODO: Consider returning the original StringType object. + return super().__new__(cls, source) + else: + return cast(StringType, super().__new__(cls, source)) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({super().__repr__()})" + + @type_matched + def __eq__(self, other: Any) -> bool: + return super().__eq__(other) + + @type_matched + def __ne__(self, other: Any) -> bool: + return super().__ne__(other) + + def __hash__(self) -> int: + return super().__hash__() + + +class TimestampType(datetime.datetime): + """ + Implements google.protobuf.Timestamp + + See https://developers.google.com/protocol-buffers/docs/reference/google.protobuf + + Also see https://www.ietf.org/rfc/rfc3339.txt. + + The protobuf implementation is an ordered pair of int64 seconds and int32 nanos. + + Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.datetime`. + + From protobuf documentation for making a Timestamp in Python:: + + now = time.time() + seconds = int(now) + nanos = int((now - seconds) * 10**9) + timestamp = Timestamp(seconds=seconds, nanos=nanos) + + Also:: + + >>> t = TimestampType("2009-02-13T23:31:30Z") + >>> repr(t) + "TimestampType('2009-02-13T23:31:30Z')" + >>> t.timestamp() + 1234567890.0 + >>> str(t) + '2009-02-13T23:31:30Z' + + :strong:`Timezones` + + Timezones are expressed in the following grammar: + + :: + + TimeZone = "UTC" | LongTZ | FixedTZ ; + LongTZ = ? list available at + http://joda-time.sourceforge.net/timezones.html ? ; + FixedTZ = ( "+" | "-" ) Digit Digit ":" Digit Digit ; + Digit = "0" | "1" | ... | "9" ; + + Fixed timezones are explicit hour and minute offsets from UTC. + Long timezone names are like Europe/Paris, CET, or US/Central. + + The Joda project (https://www.joda.org/joda-time/timezones.html) + says "Time zone data is provided by the public IANA time zone database." + + The ``dateutil`` project (https://pypi.org/project/python-dateutil/) + is used for TZ handling and timestamp parsing. + + Additionally, there is a ``TZ_ALIASES`` mapping available in this class to permit additional + timezone names. By default, the mapping is empty, and the only names + available are those recognized by :mod:`dateutil.tz`. + """ + + TZ_ALIASES: Dict[str, str] = {} + + def __new__( + cls: Type['TimestampType'], + source: Union[int, str, datetime.datetime], + *args: Any, + **kwargs: Any) -> 'TimestampType': + + if isinstance(source, datetime.datetime): + # Wrap a datetime.datetime + return super().__new__( + cls, + year=source.year, + month=source.month, + day=source.day, + hour=source.hour, + minute=source.minute, + second=source.second, + microsecond=source.microsecond, + tzinfo=source.tzinfo or datetime.timezone.utc + ) + + elif isinstance(source, int) and len(args) >= 2: + # Wrap a sequence of integers that datetime.datetime might accept. + ts: TimestampType = super().__new__( + cls, source, *args, **kwargs + ) + if not ts.tzinfo: + ts = ts.replace(tzinfo=datetime.timezone.utc) + return ts + + elif isinstance(source, str): + # Use dateutil to try a variety of text formats. + parsed_datetime = dateutil.parser.isoparse(source) + return super().__new__( + cls, + year=parsed_datetime.year, + month=parsed_datetime.month, + day=parsed_datetime.day, + hour=parsed_datetime.hour, + minute=parsed_datetime.minute, + second=parsed_datetime.second, + microsecond=parsed_datetime.microsecond, + tzinfo=parsed_datetime.tzinfo + ) + + else: + raise TypeError(f"Cannot create {cls} from {source!r}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self)!r})" + + def __str__(self) -> str: + return self.strftime("%Y-%m-%dT%H:%M:%SZ") + + def __add__(self, other: Any) -> 'TimestampType': + """Timestamp + Duration -> Timestamp""" + result = super().__add__(other) + if result == NotImplemented: + return NotImplemented + return TimestampType(result) + + def __radd__(self, other: Any) -> 'TimestampType': + """Duration + Timestamp -> Timestamp""" + result = super().__radd__(other) + if result == NotImplemented: + return NotImplemented + return TimestampType(result) + + # For more information, check the typeshed definition + # https://github.com/python/typeshed/blob/master/stdlib/2and3/datetime.pyi + + @overload # type: ignore + def __sub__(self, other: 'TimestampType') -> 'DurationType': + ... # pragma: no cover + + @overload + def __sub__(self, other: 'DurationType') -> 'TimestampType': + ... # pragma: no cover + + def __sub__( + self, + other: Union['TimestampType', 'DurationType'] + ) -> Union['TimestampType', 'DurationType']: + result = super().__sub__(other) + if result == NotImplemented: + return cast(DurationType, result) + if isinstance(result, datetime.timedelta): + return DurationType(result) + return TimestampType(result) + + @classmethod + def tz_name_lookup(cls, tz_name: str) -> Optional[datetime.tzinfo]: + """ + The :py:func:`dateutil.tz.gettz` may be extended with additional aliases. + + .. TODO: Permit an extension into the timezone lookup. + Tweak ``celpy.celtypes.TimestampType.TZ_ALIASES``. + """ + tz_lookup = str(tz_name) + if tz_lookup in cls.TZ_ALIASES: + tz = dateutil.tz.gettz(cls.TZ_ALIASES[tz_lookup]) + else: + tz = dateutil.tz.gettz(tz_lookup) + return tz + + @classmethod + def tz_offset_parse(cls, tz_name: str) -> Optional[datetime.tzinfo]: + tz_pat = re.compile(r"^([+-]?)(\d\d?):(\d\d)$") + tz_match = tz_pat.match(tz_name) + if not tz_match: + raise ValueError(f"Unparsable timezone: {tz_name!r}") + sign, hh, mm = tz_match.groups() + offset_min = (int(hh) * 60 + int(mm)) * (-1 if sign == '-' else +1) + offset = datetime.timedelta(seconds=offset_min * 60) + tz = datetime.timezone(offset) + return tz + + @staticmethod + def tz_parse(tz_name: Optional[str]) -> Optional[datetime.tzinfo]: + if tz_name: + tz = TimestampType.tz_name_lookup(tz_name) + if tz is None: + tz = TimestampType.tz_offset_parse(tz_name) + return tz + else: + return dateutil.tz.UTC + + def getDate(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).day) + + def getDayOfMonth(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).day - 1) + + def getDayOfWeek(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).isoweekday() % 7) + + def getDayOfYear(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + working_date = self.astimezone(new_tz) + jan1 = datetime.datetime(working_date.year, 1, 1, tzinfo=new_tz) + days = working_date.toordinal() - jan1.toordinal() + return IntType(days) + + def getMonth(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).month - 1) + + def getFullYear(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).year) + + def getHours(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).hour) + + def getMilliseconds(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).microsecond // 1000) + + def getMinutes(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).minute) + + def getSeconds(self, tz_name: Optional[StringType] = None) -> IntType: + new_tz = self.tz_parse(tz_name) + return IntType(self.astimezone(new_tz).second) + + +class DurationType(datetime.timedelta): + """ + Implements google.protobuf.Duration + + https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration + + The protobuf implementation is an ordered pair of int64 seconds and int32 nanos. + Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.timedelta`. + + The definition once said this:: + + "type conversion, duration should be end with "s", which stands for seconds" + + This is obsolete, however, considering the following issue. + + See https://github.com/google/cel-spec/issues/138 + + This refers to the following implementation detail + :: + + // A duration string is a possibly signed sequence of + // decimal numbers, each with optional fraction and a unit suffix, + // such as "300ms", "-1.5h" or "2h45m". + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + + The real regex, then is this:: + + [-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+ + + """ + MaxSeconds = 315576000000 + MinSeconds = -315576000000 + NanosecondsPerSecond = 1000000000 + + scale: Dict[str, float] = { + "ns": 1E-9, + "us": 1E-6, + "µs": 1E-6, + "ms": 1E-3, + "s": 1., + "m": 60., + "h": 60. * 60., + "d": 24. * 60. * 60., + } + + def __new__( + cls: Type['DurationType'], + seconds: Any, + nanos: int = 0, + **kwargs: Any) -> 'DurationType': + if isinstance(seconds, datetime.timedelta): + if not (cls.MinSeconds <= seconds.total_seconds() <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, days=seconds.days, seconds=seconds.seconds, microseconds=seconds.microseconds) + elif isinstance(seconds, int): + if not (cls.MinSeconds <= seconds <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, seconds=seconds, microseconds=nanos // 1000) + elif isinstance(seconds, str): + duration_pat = re.compile(r"^[-+]?([0-9]*(\.[0-9]*)?[a-z]+)+$") + + duration_match = duration_pat.match(seconds) + if not duration_match: + raise ValueError(f"Invalid duration {seconds!r}") + + # Consume the sign. + sign: float + if seconds.startswith("+"): + seconds = seconds[1:] + sign = +1 + elif seconds.startswith("-"): + seconds = seconds[1:] + sign = -1 + else: + sign = +1 + + # Sum the remaining time components: number * unit + try: + seconds = sign * fsum( + map( + lambda n_u: float(n_u.group(1)) * cls.scale[n_u.group(3)], + re.finditer(r"([0-9]*(\.[0-9]*)?)([a-z]+)", seconds) + ) + ) + except KeyError: + raise ValueError(f"Invalid duration {seconds!r}") + + if not (cls.MinSeconds <= seconds <= cls.MaxSeconds): + raise ValueError("range error: {seconds}") + return super().__new__( + cls, seconds=seconds) + else: + raise TypeError(f"Invalid initial value type: {type(seconds)}") + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self)!r})" + + def __str__(self) -> str: + return "{0}s".format(int(self.total_seconds())) + + def __add__(self, other: Any) -> 'DurationType': + """ + This doesn't need to handle the rich variety of TimestampType overloadds. + This class only needs to handle results of duration + duration. + A duration + timestamp is not implemented by the timedelta superclass; + it is handled by the datetime superclass that implementes timestamp + duration. + """ + result = super().__add__(other) + if result == NotImplemented: + return cast(DurationType, result) + # This is handled by TimestampType; this is here for completeness, but isn't used. + if isinstance(result, (datetime.datetime, TimestampType)): + return TimestampType(result) # pragma: no cover + return DurationType(result) + + def __radd__(self, other: Any) -> 'DurationType': # pragma: no cover + """ + This doesn't need to handle the rich variety of TimestampType overloadds. + + Most cases are handled by TimeStamp. + """ + result = super().__radd__(other) + if result == NotImplemented: + return cast(DurationType, result) + # This is handled by TimestampType; this is here for completeness, but isn't used. + if isinstance(result, (datetime.datetime, TimestampType)): + return TimestampType(result) + return DurationType(result) + + def getHours(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() / 60 / 60)) + + def getMilliseconds(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() * 1000)) + + def getMinutes(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds() / 60)) + + def getSeconds(self, tz_name: Optional[str] = None) -> IntType: + assert tz_name is None + return IntType(int(self.total_seconds())) + + +class FunctionType: + """ + We need a concrete Annotation object to describe callables to celpy. + We need to describe functions as well as callable objects. + The description would tend to shadow ``typing.Callable``. + + An ``__isinstance__()`` method, for example, may be helpful for run-time type-checking. + + Superclass for CEL extension functions that are defined at run-time. + This permits a formal annotation in the environment construction that creates + an intended type for a given name. + + This allows for some run-time type checking to see if the actual object binding + matches the declared type binding. + + Also used to define protobuf classes provided as an annotation. + + We *could* define this as three overloads to cover unary, binary, and tertiary cases. + """ + def __call__(self, *args: Value, **kwargs: Value) -> Value: + raise NotImplementedError + + +class PackageType(MapType): + """ + A package of message types, usually protobuf. + + TODO: This may not be needed. + """ + pass + + +class MessageType(MapType): + """ + An individual protobuf message definition. A mapping from field name to field value. + + See Scenario: "message_literal" in the parse.feature. This is a very deeply-nested + message (30? levels), but the navigation to "payload" field seems to create a default + value at the top level. + """ + def __init__(self, *args: Value, **fields: Value) -> None: + if args and len(args) == 1: + super().__init__(cast(Mapping[Value, Value], args[0])) + elif args and len(args) > 1: + raise TypeError(r"Expected dictionary or fields, not {args!r}") + else: + super().__init__({StringType(k): v for k, v in fields.items()}) + + # def get(self, field: Any, default: Optional[Value] = None) -> Value: + # """ + # Alternative implementation with descent to locate a deeply-buried field. + # It seemed like this was the defined behavior. It turns it, it isn't. + # The code is here in case we're wrong and it really is the defined behavior. + # + # Note. There is no default provision in CEL. + # """ + # if field in self: + # return super().get(field) + # + # def descend(message: MessageType, field: Value) -> MessageType: + # if field in message: + # return message + # for k in message.keys(): + # found = descend(message[k], field) + # if found is not None: + # return found + # return None + # + # sub_message = descend(self, field) + # if sub_message is None: + # return default + # return sub_message.get(field) + + +class TypeType: + """ + Annotation used to mark protobuf type objects. + We map these to CELTypes so that type name testing works. + """ + type_name_mapping = { + "google.protobuf.Duration": DurationType, + "google.protobuf.Timestamp": TimestampType, + "google.protobuf.Int32Value": IntType, + "google.protobuf.Int64Value": IntType, + "google.protobuf.UInt32Value": UintType, + "google.protobuf.UInt64Value": UintType, + "google.protobuf.FloatValue": DoubleType, + "google.protobuf.DoubleValue": DoubleType, + "google.protobuf.Value": MessageType, + "google.protubuf.Any": MessageType, # Weird. + "google.protobuf.Any": MessageType, + "list_type": ListType, + "map_type": MapType, + "map": MapType, + "list": ListType, + "string": StringType, + "bytes": BytesType, + "bool": BoolType, + "int": IntType, + "uint": UintType, + "double": DoubleType, + "null_type": type(None), + "STRING": StringType, + "BOOL": BoolType, + "INT64": IntType, + "UINT64": UintType, + "INT32": IntType, + "UINT32": UintType, + "BYTES": BytesType, + "DOUBLE": DoubleType, + } + + def __init__( + self, + value: Any = "") -> None: + if isinstance(value, str) and value in self.type_name_mapping: + self.type_reference = self.type_name_mapping[value] + elif isinstance(value, str): + try: + self.type_reference = eval(value) + except (NameError, SyntaxError): + raise TypeError(f"Unknown type {value!r}") + else: + self.type_reference = value.__class__ + + def __eq__(self, other: Any) -> bool: + return ( + other == self.type_reference + or isinstance(other, self.type_reference) # noqa: W503 + ) |
