about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/celpy/celtypes.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/celpy/celtypes.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/celpy/celtypes.py')
-rw-r--r--.venv/lib/python3.12/site-packages/celpy/celtypes.py1495
1 files changed, 1495 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/celpy/celtypes.py b/.venv/lib/python3.12/site-packages/celpy/celtypes.py
new file mode 100644
index 00000000..3c4513ac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/celpy/celtypes.py
@@ -0,0 +1,1495 @@
+# SPDX-Copyright: Copyright (c) Capital One Services, LLC
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 Capital One Services, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and limitations under the License.
+
+"""
+CEL Types: wrappers on Python types to provide CEL semantics.
+
+This can be used by a Python module to work with CEL-friendly values and CEL results.
+
+Examples of distinctions between CEL and Python:
+
+-   Unlike Python ``bool``, CEL :py:class:`BoolType` won't do some math.
+
+-   CEL has ``int64`` and ``uint64`` subclasses of integer. These have specific ranges and
+    raise :exc:`ValueError` errors on overflow.
+
+CEL types will raise :exc:`ValueError` for out-of-range values and :exc:`TypeError`
+for operations they refuse.
+The :py:mod:`evaluation` module can capture these exceptions and turn them into result values.
+This can permit the logic operators to quietly silence them via "short-circuiting".
+
+In the normal course of events, CEL's evaluator may attempt operations between a
+CEL exception result and an instance of one of CEL types.
+We rely on this leading to an ordinary Python :exc:`TypeError` to be raised to propogate
+the error. Or. A logic operator may discard the error object.
+
+The :py:mod:`evaluation` module extends these types with it's own :exc:`CELEvalError` exception.
+We try to keep that as a separate concern from the core operator implementations here.
+We leverage Python features, which means raising exceptions when there is a problem.
+
+Types
+=============
+
+See https://github.com/google/cel-go/tree/master/common/types
+
+These are the Go type definitions that are used by CEL:
+
+-   BoolType
+-   BytesType
+-   DoubleType
+-   DurationType
+-   IntType
+-   ListType
+-   MapType
+-   NullType
+-   StringType
+-   TimestampType
+-   TypeType
+-   UintType
+
+The above types are handled directly byt CEL syntax.
+e.g., ``42`` vs. ``42u`` vs. ``"42"`` vs. ``b"42"`` vs. ``42.``.
+
+We provide matching Python class names for each of these types. The Python type names
+are subclasses of Python native types, allowing a client to transparently work with
+CEL results. A Python host should be able to provide values to CEL that will be tolerated.
+
+A type hint of ``Value`` unifies these into a common hint.
+
+The CEL Go implementation also supports protobuf types:
+
+-   dpb.Duration
+-   tpb.Timestamp
+-   structpb.ListValue
+-   structpb.NullValue
+-   structpb.Struct
+-   structpb.Value
+-   wrapperspb.BoolValue
+-   wrapperspb.BytesValue
+-   wrapperspb.DoubleValue
+-   wrapperspb.FloatValue
+-   wrapperspb.Int32Value
+-   wrapperspb.Int64Value
+-   wrapperspb.StringValue
+-   wrapperspb.UInt32Value
+-   wrapperspb.UInt64Value
+
+These types involve expressions like the following::
+
+    google.protobuf.UInt32Value{value: 123u}
+
+In this case, the well-known protobuf name is directly visible as CEL syntax.
+There's a ``google`` package with the needed definitions.
+
+Type Provider
+==============================
+
+A type provider can be bound to the environment, this will support additional types.
+This appears to be a factory to map names of types to type classes.
+
+Run-time type binding is shown by a CEL expression like the following::
+
+    TestAllTypes{single_uint32_wrapper: 432u}
+
+The ``TestAllTypes`` is a protobuf type added to the CEL run-time. The syntax
+is defined by this syntax rule::
+
+    member_object  : member "{" [fieldinits] "}"
+
+The ``member`` is part of a type provider library,
+either a standard protobuf definition or an extension. The field inits build
+values for the protobuf object.
+
+See https://github.com/google/cel-go/blob/master/test/proto3pb/test_all_types.proto
+for the ``TestAllTypes`` protobuf definition that is registered as a type provider.
+
+This expression will describes a Protobuf ``uint32`` object.
+
+Type Adapter
+=============
+
+So far, it appears that a type adapter wraps existing Go or C++ types
+with CEL-required methods. This seems like it does not need to be implemented
+in Python.
+
+Numeric Details
+===============
+
+Integer division truncates toward zero.
+
+The Go definition of modulus::
+
+    // Mod returns the floating-point remainder of x/y.
+    // The magnitude of the result is less than y and its
+    // sign agrees with that of x.
+
+https://golang.org/ref/spec#Arithmetic_operators
+
+"Go has the nice property that -a/b == -(a/b)."
+
+::
+
+     x     y     x / y     x % y
+     5     3       1         2
+    -5     3      -1        -2
+     5    -3      -1         2
+    -5    -3       1        -2
+
+Python definition::
+
+    The modulo operator always yields a result
+    with the same sign as its second operand (or zero);
+    the absolute value of the result is strictly smaller than
+    the absolute value of the second operand.
+
+Here's the essential rule::
+
+    x//y * y + x%y == x
+
+However. Python ``//`` truncates toward negative infinity. Go ``/`` truncates toward zero.
+
+To get Go-like behavior, we need to use absolute values and restore the signs later.
+
+::
+
+    x_sign = -1 if x < 0 else +1
+    go_mod = x_sign * (abs(x) % abs(y))
+    return go_mod
+
+Timzone Details
+===============
+
+An implementation may have additional timezone names that must be injected into
+th dateutil.gettz() processing.
+
+For example, there may be the following sequence:
+
+1. A lowercase match for an alias or an existing dateutil timezone.
+
+2. A titlecase match for an existing dateutil timezone.
+
+3. The fallback, which is a +/-HH:MM string.
+
+..  TODO: Permit an extension into the timezone lookup.
+
+"""
+import datetime
+import logging
+import re
+from functools import reduce, wraps
+from math import fsum
+from typing import (Any, Callable, Dict, Iterable, List, Mapping, NoReturn,
+                    Optional, Sequence, Tuple, Type, TypeVar, Union, cast,
+                    overload)
+
+import dateutil.parser
+import dateutil.tz
+
+logger = logging.getLogger("celtypes")
+
+
+Value = Union[
+    'BoolType',
+    'BytesType',
+    'DoubleType',
+    'DurationType',
+    'IntType',
+    'ListType',
+    'MapType',
+    None,   # Used instead of NullType
+    'StringType',
+    'TimestampType',
+    'UintType',
+]
+
+# The domain of types used to build Annotations.
+CELType = Union[
+    Type['BoolType'],
+    Type['BytesType'],
+    Type['DoubleType'],
+    Type['DurationType'],
+    Type['IntType'],
+    Type['ListType'],
+    Type['MapType'],
+    Callable[..., None],  # Used instead of NullType
+    Type['StringType'],
+    Type['TimestampType'],
+    Type['TypeType'],  # Used to mark Protobuf Type values
+    Type['UintType'],
+    Type['PackageType'],
+    Type['MessageType'],
+]
+
+
+def type_matched(method: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
+    """Decorates a method to assure the "other" value has the same type."""
+    @wraps(method)
+    def type_matching_method(self: Any, other: Any) -> Any:
+        if not(issubclass(type(other), type(self)) or issubclass(type(self), type(other))):
+            raise TypeError(f"no such overload: {self!r} {type(self)} != {other!r} {type(other)}")
+        return method(self, other)
+    return type_matching_method
+
+
+def logical_condition(e: Value, x: Value, y: Value) -> Value:
+    """
+    CEL e ? x : y operator.
+    Choose one of x or y. Exceptions in the unchosen expression are ignored.
+
+    Example::
+
+        2 / 0 > 4 ? 'baz' : 'quux'
+
+    is a "division by zero" error.
+
+    ::
+
+        >>> logical_condition(
+        ... BoolType(True), StringType("this"), StringType("Not That"))
+        StringType('this')
+        >>> logical_condition(
+        ... BoolType(False), StringType("Not This"), StringType("that"))
+        StringType('that')
+    """
+    if not isinstance(e, BoolType):
+        raise TypeError(f"Unexpected {type(e)} ? {type(x)} : {type(y)}")
+    result = x if e else y
+    logger.debug("logical_condition(%r, %r, %r) = %r", e, x, y, result)
+    return result
+
+
+def logical_and(x: Value, y: Value) -> Value:
+    """
+    Native Python has a left-to-right rule.
+    CEL && is commutative with non-Boolean values, including error objects.
+    """
+    if not isinstance(x, BoolType) and not isinstance(y, BoolType):
+        raise TypeError(f"{type(x)} {x!r} and {type(y)} {y!r}")
+    elif not isinstance(x, BoolType) and isinstance(y, BoolType):
+        if y:
+            return x  # whatever && true == whatever
+        else:
+            return y  # whatever && false == false
+    elif isinstance(x, BoolType) and not isinstance(y, BoolType):
+        if x:
+            return y  # true && whatever == whatever
+        else:
+            return x  # false && whatever == false
+    else:
+        return BoolType(cast(BoolType, x) and cast(BoolType, y))
+
+
+def logical_not(x: Value) -> Value:
+    """
+    Native python `not` isn't fully exposed for CEL types.
+    """
+    if isinstance(x, BoolType):
+        result = BoolType(not x)
+    else:
+        raise TypeError(f"not {type(x)}")
+    logger.debug("logical_not(%r) = %r", x, result)
+    return result
+
+
+def logical_or(x: Value, y: Value) -> Value:
+    """
+    Native Python has a left-to-right rule: (True or y) is True, (False or y) is y.
+    CEL || is commutative with non-Boolean values, including errors.
+    ``(x || false)`` is ``x``, and ``(false || y)`` is ``y``.
+
+    Example 1::
+
+        false || 1/0 != 0
+
+    is a "no matching overload" error.
+
+    Example 2::
+
+        (2 / 0 > 3 ? false : true) || true
+
+    is a "True"
+
+    If the operand(s) are not BoolType, we'll create an TypeError that will become a CELEvalError.
+    """
+    if not isinstance(x, BoolType) and not isinstance(y, BoolType):
+        raise TypeError(f"{type(x)} {x!r} or {type(y)} {y!r}")
+    elif not isinstance(x, BoolType) and isinstance(y, BoolType):
+        if y:
+            return y  # whatever || true == true
+        else:
+            return x  # whatever || false == whatever
+    elif isinstance(x, BoolType) and not isinstance(y, BoolType):
+        if x:
+            return x  # true || whatever == true
+        else:
+            return y  # false || whatever == whatever
+    else:
+        return BoolType(cast(BoolType, x) or cast(BoolType, y))
+
+
+class BoolType(int):
+    """
+    Native Python permits unary operators on Booleans.
+
+    For CEL, We need to prevent -false from working.
+    """
+    def __new__(cls: Type['BoolType'], source: Any) -> 'BoolType':
+        if source is None:
+            return super().__new__(cls, 0)
+        elif isinstance(source, BoolType):
+            return source
+        elif isinstance(source, MessageType):
+            return super().__new__(
+                cls,
+                cast(int, source.get(StringType("value")))
+            )
+        else:
+            return super().__new__(cls, source)
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({bool(self)})"
+
+    def __str__(self) -> str:
+        return str(bool(self))
+
+    def __neg__(self) -> NoReturn:
+        raise TypeError("no such overload")
+
+    def __hash__(self) -> int:
+        return super().__hash__()
+
+
+class BytesType(bytes):
+    """Python's bytes semantics are close to CEL."""
+    def __new__(
+            cls: Type['BytesType'],
+            source: Union[str, bytes, Iterable[int], 'BytesType', 'StringType'],
+            *args: Any,
+            **kwargs: Any
+    ) -> 'BytesType':
+        if source is None:
+            return super().__new__(cls, b'')
+        elif isinstance(source, (bytes, BytesType)):
+            return super().__new__(cls, source)
+        elif isinstance(source, (str, StringType)):
+            return super().__new__(cls, source.encode('utf-8'))
+        elif isinstance(source, MessageType):
+            return super().__new__(
+                cls,
+                cast(bytes, source.get(StringType("value")))  # type: ignore [attr-defined]
+            )
+        elif isinstance(source, Iterable):
+            return super().__new__(
+                cls,
+                cast(Iterable[int], source)
+            )
+        else:
+            raise TypeError(f"Invalid initial value type: {type(source)}")
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+
+class DoubleType(float):
+    """
+    Native Python permits mixed type comparisons, doing conversions as needed.
+
+    For CEL, we need to prevent mixed-type comparisons from working.
+
+    TODO: Conversions from string? IntType? UintType? DoubleType?
+    """
+    def __new__(cls: Type['DoubleType'], source: Any) -> 'DoubleType':
+        if source is None:
+            return super().__new__(cls, 0)
+        elif isinstance(source, MessageType):
+            return super().__new__(
+                cls,
+                cast(float, source.get(StringType("value")))
+            )
+        else:
+            return super().__new__(cls, source)
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    def __str__(self) -> str:
+        text = str(float(self))
+        return text
+
+    def __neg__(self) -> 'DoubleType':
+        return DoubleType(super().__neg__())
+
+    def __mod__(self, other: Any) -> NoReturn:
+        raise TypeError(
+            f"found no matching overload for '_%_' applied to '(double, {type(other)})'")
+
+    def __truediv__(self, other: Any) -> 'DoubleType':
+        if cast(float, other) == 0.0:
+            return DoubleType("inf")
+        else:
+            return DoubleType(super().__truediv__(other))
+
+    def __rmod__(self, other: Any) -> NoReturn:
+        raise TypeError(
+            f"found no matching overload for '_%_' applied to '({type(other)}, double)'")
+
+    def __rtruediv__(self, other: Any) -> 'DoubleType':
+        if self == 0.0:
+            return DoubleType("inf")
+        else:
+            return DoubleType(super().__rtruediv__(other))
+
+    @type_matched
+    def __eq__(self, other: Any) -> bool:
+        return super().__eq__(other)
+
+    @type_matched
+    def __ne__(self, other: Any) -> bool:
+        return super().__ne__(other)
+
+    def __hash__(self) -> int:
+        return super().__hash__()
+
+
+IntOperator = TypeVar('IntOperator', bound=Callable[..., int])
+
+
+def int64(operator: IntOperator) -> IntOperator:
+    """Apply an operation, but assure the value is within the int64 range."""
+    @wraps(operator)
+    def clamped_operator(*args: Any, **kwargs: Any) -> int:
+        result: int = operator(*args, **kwargs)
+        if -(2**63) <= result < 2**63:
+            return result
+        raise ValueError("overflow")
+    return cast(IntOperator, clamped_operator)
+
+
+class IntType(int):
+    """
+    A version of int with overflow errors outside int64 range.
+
+    features/integer_math.feature:277  "int64_overflow_positive"
+
+    >>> IntType(9223372036854775807) + IntType(1)
+    Traceback (most recent call last):
+    ...
+    ValueError: overflow
+
+    >>> 2**63
+    9223372036854775808
+
+    features/integer_math.feature:285  "int64_overflow_negative"
+
+    >>> -IntType(9223372036854775808) - IntType(1)
+    Traceback (most recent call last):
+    ...
+    ValueError: overflow
+
+    >>> IntType(DoubleType(1.9))
+    IntType(2)
+    >>> IntType(DoubleType(-123.456))
+    IntType(-123)
+    """
+    def __new__(
+            cls: Type['IntType'],
+            source: Any,
+            *args: Any,
+            **kwargs: Any
+    ) -> 'IntType':
+        convert: Callable[..., int]
+        if source is None:
+            return super().__new__(cls, 0)
+        elif isinstance(source, IntType):
+            return source
+        elif isinstance(source, MessageType):
+            # Used by protobuf.
+            return super().__new__(
+                cls,
+                cast(int, source.get(StringType("value")))
+            )
+        elif isinstance(source, (float, DoubleType)):
+            convert = int64(round)
+        elif isinstance(source, TimestampType):
+            convert = int64(lambda src: src.timestamp())
+        elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
+            convert = int64(lambda src: int(src[2:], 16))
+        elif isinstance(source, (str, StringType)) and source[:3] in {"-0x", "-0X"}:
+            convert = int64(lambda src: -int(src[3:], 16))
+        else:
+            # Must tolerate "-" as part of the literal.
+            # See https://github.com/google/cel-spec/issues/126
+            convert = int64(int)
+        return super().__new__(cls, convert(source))
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    def __str__(self) -> str:
+        text = str(int(self))
+        return text
+
+    @int64
+    def __neg__(self) -> 'IntType':
+        return IntType(super().__neg__())
+
+    @int64
+    def __add__(self, other: Any) -> 'IntType':
+        return IntType(super().__add__(cast(IntType, other)))
+
+    @int64
+    def __sub__(self, other: Any) -> 'IntType':
+        return IntType(super().__sub__(cast(IntType, other)))
+
+    @int64
+    def __mul__(self, other: Any) -> 'IntType':
+        return IntType(super().__mul__(cast(IntType, other)))
+
+    @int64
+    def __truediv__(self, other: Any) -> 'IntType':
+        other = cast(IntType, other)
+        self_sign = -1 if self < IntType(0) else +1
+        other_sign = -1 if other < IntType(0) else +1
+        go_div = self_sign * other_sign * (abs(self) // abs(other))
+        return IntType(go_div)
+
+    __floordiv__ = __truediv__
+
+    @int64
+    def __mod__(self, other: Any) -> 'IntType':
+        self_sign = -1 if self < IntType(0) else +1
+        go_mod = self_sign * (abs(self) % abs(cast(IntType, other)))
+        return IntType(go_mod)
+
+    @int64
+    def __radd__(self, other: Any) -> 'IntType':
+        return IntType(super().__radd__(cast(IntType, other)))
+
+    @int64
+    def __rsub__(self, other: Any) -> 'IntType':
+        return IntType(super().__rsub__(cast(IntType, other)))
+
+    @int64
+    def __rmul__(self, other: Any) -> 'IntType':
+        return IntType(super().__rmul__(cast(IntType, other)))
+
+    @int64
+    def __rtruediv__(self, other: Any) -> 'IntType':
+        other = cast(IntType, other)
+        self_sign = -1 if self < IntType(0) else +1
+        other_sign = -1 if other < IntType(0) else +1
+        go_div = self_sign * other_sign * (abs(other) // abs(self))
+        return IntType(go_div)
+
+    __rfloordiv__ = __rtruediv__
+
+    @int64
+    def __rmod__(self, other: Any) -> 'IntType':
+        left_sign = -1 if other < IntType(0) else +1
+        go_mod = left_sign * (abs(other) % abs(self))
+        return IntType(go_mod)
+
+    @type_matched
+    def __eq__(self, other: Any) -> bool:
+        return super().__eq__(other)
+
+    @type_matched
+    def __ne__(self, other: Any) -> bool:
+        return super().__ne__(other)
+
+    @type_matched
+    def __lt__(self, other: Any) -> bool:
+        return super().__lt__(other)
+
+    @type_matched
+    def __le__(self, other: Any) -> bool:
+        return super().__le__(other)
+
+    @type_matched
+    def __gt__(self, other: Any) -> bool:
+        return super().__gt__(other)
+
+    @type_matched
+    def __ge__(self, other: Any) -> bool:
+        return super().__ge__(other)
+
+    def __hash__(self) -> int:
+        return super().__hash__()
+
+
+def uint64(operator: IntOperator) -> IntOperator:
+    """Apply an operation, but assure the value is within the uint64 range."""
+    @wraps(operator)
+    def clamped_operator(*args: Any, **kwargs: Any) -> int:
+        result = operator(*args, **kwargs)
+        if 0 <= result < 2**64:
+            return result
+        raise ValueError("overflow")
+    return cast(IntOperator, clamped_operator)
+
+
+class UintType(int):
+    """
+    A version of int with overflow errors outside uint64 range.
+
+    Alternatives:
+
+        Option 1 - Use https://pypi.org/project/fixedint/
+
+        Option 2 - use array or struct modules to access an unsigned object.
+
+    Test Cases:
+
+    features/integer_math.feature:149  "unary_minus_no_overload"
+
+    >>> -UintType(42)
+    Traceback (most recent call last):
+    ...
+    TypeError: no such overload
+
+    uint64_overflow_positive
+
+    >>> UintType(18446744073709551615) + UintType(1)
+    Traceback (most recent call last):
+    ...
+    ValueError: overflow
+
+    uint64_overflow_negative
+
+    >>> UintType(0) - UintType(1)
+    Traceback (most recent call last):
+    ...
+    ValueError: overflow
+
+    >>> - UintType(5)
+    Traceback (most recent call last):
+    ...
+    TypeError: no such overload
+    """
+    def __new__(
+            cls: Type['UintType'],
+            source: Any,
+            *args: Any,
+            **kwargs: Any
+    ) -> 'UintType':
+        convert: Callable[..., int]
+        if isinstance(source, UintType):
+            return source
+        elif isinstance(source, (float, DoubleType)):
+            convert = uint64(round)
+        elif isinstance(source, TimestampType):
+            convert = uint64(lambda src: src.timestamp())
+        elif isinstance(source, (str, StringType)) and source[:2] in {"0x", "0X"}:
+            convert = uint64(lambda src: int(src[2:], 16))
+        elif isinstance(source, MessageType):
+            # Used by protobuf.
+            convert = uint64(lambda src: src['value'] if src['value'] is not None else 0)
+        elif source is None:
+            convert = uint64(lambda src: 0)
+        else:
+            convert = uint64(int)
+        return super().__new__(cls, convert(source))
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    def __str__(self) -> str:
+        text = str(int(self))
+        return text
+
+    def __neg__(self) -> NoReturn:
+        raise TypeError("no such overload")
+
+    @uint64
+    def __add__(self, other: Any) -> 'UintType':
+        return UintType(super().__add__(cast(IntType, other)))
+
+    @uint64
+    def __sub__(self, other: Any) -> 'UintType':
+        return UintType(super().__sub__(cast(IntType, other)))
+
+    @uint64
+    def __mul__(self, other: Any) -> 'UintType':
+        return UintType(super().__mul__(cast(IntType, other)))
+
+    @uint64
+    def __truediv__(self, other: Any) -> 'UintType':
+        return UintType(super().__floordiv__(cast(IntType, other)))
+
+    __floordiv__ = __truediv__
+
+    @uint64
+    def __mod__(self, other: Any) -> 'UintType':
+        return UintType(super().__mod__(cast(IntType, other)))
+
+    @uint64
+    def __radd__(self, other: Any) -> 'UintType':
+        return UintType(super().__radd__(cast(IntType, other)))
+
+    @uint64
+    def __rsub__(self, other: Any) -> 'UintType':
+        return UintType(super().__rsub__(cast(IntType, other)))
+
+    @uint64
+    def __rmul__(self, other: Any) -> 'UintType':
+        return UintType(super().__rmul__(cast(IntType, other)))
+
+    @uint64
+    def __rtruediv__(self, other: Any) -> 'UintType':
+        return UintType(super().__rfloordiv__(cast(IntType, other)))
+
+    __rfloordiv__ = __rtruediv__
+
+    @uint64
+    def __rmod__(self, other: Any) -> 'UintType':
+        return UintType(super().__rmod__(cast(IntType, other)))
+
+    @type_matched
+    def __eq__(self, other: Any) -> bool:
+        return super().__eq__(other)
+
+    @type_matched
+    def __ne__(self, other: Any) -> bool:
+        return super().__ne__(other)
+
+    def __hash__(self) -> int:
+        return super().__hash__()
+
+
+class ListType(List[Value]):
+    """
+    Native Python implements comparison operations between list objects.
+
+    For CEL, we prevent list comparison operators from working.
+
+    We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
+    gracefully ignore type mismatch problems, calling them not equal.
+
+    See https://github.com/google/cel-spec/issues/127
+
+    An implied logical And means a singleton behaves in a distinct way from a non-singleton list.
+    """
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    def __lt__(self, other: Any) -> NoReturn:
+        raise TypeError("no such overload")
+
+    def __le__(self, other: Any) -> NoReturn:
+        raise TypeError("no such overload")
+
+    def __gt__(self, other: Any) -> NoReturn:
+        raise TypeError("no such overload")
+
+    def __ge__(self, other: Any) -> NoReturn:
+        raise TypeError("no such overload")
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, (list, ListType)):
+            raise TypeError(f"no such overload: ListType == {type(other)}")
+
+        def equal(s: Any, o: Any) -> Value:
+            try:
+                return BoolType(s == o)
+            except TypeError as ex:
+                return cast(BoolType, ex)  # Instead of Union[BoolType, TypeError]
+
+        result = (
+            len(self) == len(other)
+            and reduce(  # noqa: W503
+                logical_and,  # type: ignore [arg-type]
+                (equal(item_s, item_o) for item_s, item_o in zip(self, other)),
+                BoolType(True)  # type: ignore [arg-type]
+            )
+        )
+        if isinstance(result, TypeError):
+            raise result
+        return bool(result)
+
+    def __ne__(self, other: Any) -> bool:
+        if not isinstance(other, (list, ListType)):
+            raise TypeError(f"no such overload: ListType != {type(other)}")
+
+        def not_equal(s: Any, o: Any) -> Value:
+            try:
+                return BoolType(s != o)
+            except TypeError as ex:
+                return cast(BoolType, ex)  # Instead of Union[BoolType, TypeError]
+
+        result = (
+            len(self) != len(other)
+            or reduce(  # noqa: W503
+                logical_or,  # type: ignore [arg-type]
+                (not_equal(item_s, item_o) for item_s, item_o in zip(self, other)),
+                BoolType(False)  # type: ignore [arg-type]
+            )
+        )
+        if isinstance(result, TypeError):
+            raise result
+        return bool(result)
+
+
+BaseMapTypes = Union[
+    Mapping[Any, Any],
+    Sequence[Tuple[Any, Any]],
+    None
+]
+
+
+MapKeyTypes = Union[
+    'IntType', 'UintType', 'BoolType', 'StringType', str
+]
+
+
+class MapType(Dict[Value, Value]):
+    """
+    Native Python allows mapping updates and any hashable type as a kay.
+
+    CEL prevents mapping updates and has a limited domain of key types.
+        int, uint, bool, or string keys
+
+    We provide an :py:meth:`__eq__` and :py:meth:`__ne__` that
+    gracefully ignore type mismatch problems for the values, calling them not equal.
+
+    See https://github.com/google/cel-spec/issues/127
+
+    An implied logical And means a singleton behaves in a distinct way from a non-singleton mapping.
+    """
+    def __init__(
+            self,
+            items: BaseMapTypes = None) -> None:
+        super().__init__()
+        if items is None:
+            pass
+        elif isinstance(items, Sequence):
+            for name, value in items:
+                self[name] = value
+        elif isinstance(items, Mapping):
+            for name, value in items.items():
+                self[name] = value
+        else:
+            raise TypeError(f"Invalid initial value type: {type(items)}")
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    def __getitem__(self, key: Any) -> Any:
+        if not MapType.valid_key_type(key):
+            raise TypeError(f"unsupported key type: {type(key)}")
+        return super().__getitem__(key)
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, (Mapping, MapType)):
+            raise TypeError(f"no such overload: MapType == {type(other)}")
+
+        def equal(s: Any, o: Any) -> BoolType:
+            try:
+                return BoolType(s == o)
+            except TypeError as ex:
+                return cast(BoolType, ex)  # Instead of Union[BoolType, TypeError]
+
+        keys_s = self.keys()
+        keys_o = other.keys()
+        result = (
+            keys_s == keys_o
+            and reduce(  # noqa: W503
+                logical_and,  # type: ignore [arg-type]
+                (equal(self[k], other[k]) for k in keys_s),
+                BoolType(True)  # type: ignore [arg-type]
+            )
+        )
+        if isinstance(result, TypeError):
+            raise result
+        return bool(result)
+
+    def __ne__(self, other: Any) -> bool:
+        if not isinstance(other, (Mapping, MapType)):
+            raise TypeError(f"no such overload: MapType != {type(other)}")
+
+        # Singleton special case, may return no-such overload.
+        if len(self) == 1 and len(other) == 1 and self.keys() == other.keys():
+            k = next(iter(self.keys()))
+            return cast(bool, self[k] != other[k])  # Instead of Union[BoolType, TypeError]
+
+        def not_equal(s: Any, o: Any) -> BoolType:
+            try:
+                return BoolType(s != o)
+            except TypeError as ex:
+                return cast(BoolType, ex)  # Instead of Union[BoolType, TypeError]
+
+        keys_s = self.keys()
+        keys_o = other.keys()
+        result = (
+            keys_s != keys_o
+            or reduce(  # noqa: W503
+                logical_or,  # type: ignore [arg-type]
+                (not_equal(self[k], other[k]) for k in keys_s),
+                BoolType(False)  # type: ignore [arg-type]
+            )
+        )
+        if isinstance(result, TypeError):
+            raise result
+        return bool(result)
+
+    @staticmethod
+    def valid_key_type(key: Any) -> bool:
+        """Valid CEL key types. Plus native str for tokens in the source when evaluating ``e.f``"""
+        return isinstance(key, (IntType, UintType, BoolType, StringType, str))
+
+
+class NullType:
+    """TBD. May not be needed. Python's None semantics appear to match CEL perfectly."""
+    pass
+
+
+class StringType(str):
+    """Python's str semantics are very, very close to CEL.
+
+    We rely on the overlap between ``"/u270c"`` and ``"/U0001f431"`` in CEL and Python.
+    """
+    def __new__(
+            cls: Type['StringType'],
+            source: Union[str, bytes, 'BytesType', 'StringType'],
+            *args: Any,
+            **kwargs: Any
+    ) -> 'StringType':
+        if isinstance(source, (bytes, BytesType)):
+            return super().__new__(cls, source.decode('utf'))
+        elif isinstance(source, (str, StringType)):
+            # TODO: Consider returning the original StringType object.
+            return super().__new__(cls, source)
+        else:
+            return cast(StringType, super().__new__(cls, source))
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({super().__repr__()})"
+
+    @type_matched
+    def __eq__(self, other: Any) -> bool:
+        return super().__eq__(other)
+
+    @type_matched
+    def __ne__(self, other: Any) -> bool:
+        return super().__ne__(other)
+
+    def __hash__(self) -> int:
+        return super().__hash__()
+
+
+class TimestampType(datetime.datetime):
+    """
+    Implements google.protobuf.Timestamp
+
+    See https://developers.google.com/protocol-buffers/docs/reference/google.protobuf
+
+    Also see https://www.ietf.org/rfc/rfc3339.txt.
+
+    The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
+
+    Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.datetime`.
+
+    From protobuf documentation for making a Timestamp in Python::
+
+        now = time.time()
+        seconds = int(now)
+        nanos = int((now - seconds) * 10**9)
+        timestamp = Timestamp(seconds=seconds, nanos=nanos)
+
+    Also::
+
+        >>> t = TimestampType("2009-02-13T23:31:30Z")
+        >>> repr(t)
+        "TimestampType('2009-02-13T23:31:30Z')"
+        >>> t.timestamp()
+        1234567890.0
+        >>> str(t)
+        '2009-02-13T23:31:30Z'
+
+    :strong:`Timezones`
+
+    Timezones are expressed in the following grammar:
+
+    ::
+
+        TimeZone = "UTC" | LongTZ | FixedTZ ;
+        LongTZ = ? list available at
+                   http://joda-time.sourceforge.net/timezones.html ? ;
+        FixedTZ = ( "+" | "-" ) Digit Digit ":" Digit Digit ;
+        Digit = "0" | "1" | ... | "9" ;
+
+    Fixed timezones are explicit hour and minute offsets from UTC.
+    Long timezone names are like Europe/Paris, CET, or US/Central.
+
+    The Joda project (https://www.joda.org/joda-time/timezones.html)
+    says "Time zone data is provided by the public IANA time zone database."
+
+    The ``dateutil`` project (https://pypi.org/project/python-dateutil/)
+    is used for TZ handling and timestamp parsing.
+
+    Additionally, there is a ``TZ_ALIASES`` mapping available in this class to permit additional
+    timezone names. By default, the mapping is empty, and the only names
+    available are those recognized by :mod:`dateutil.tz`.
+    """
+
+    TZ_ALIASES: Dict[str, str] = {}
+
+    def __new__(
+            cls: Type['TimestampType'],
+            source: Union[int, str, datetime.datetime],
+            *args: Any,
+            **kwargs: Any) -> 'TimestampType':
+
+        if isinstance(source, datetime.datetime):
+            # Wrap a datetime.datetime
+            return super().__new__(
+                cls,
+                year=source.year,
+                month=source.month,
+                day=source.day,
+                hour=source.hour,
+                minute=source.minute,
+                second=source.second,
+                microsecond=source.microsecond,
+                tzinfo=source.tzinfo or datetime.timezone.utc
+            )
+
+        elif isinstance(source, int) and len(args) >= 2:
+            # Wrap a sequence of integers that datetime.datetime might accept.
+            ts: TimestampType = super().__new__(
+                cls, source, *args, **kwargs
+            )
+            if not ts.tzinfo:
+                ts = ts.replace(tzinfo=datetime.timezone.utc)
+            return ts
+
+        elif isinstance(source, str):
+            # Use dateutil to try a variety of text formats.
+            parsed_datetime = dateutil.parser.isoparse(source)
+            return super().__new__(
+                cls,
+                year=parsed_datetime.year,
+                month=parsed_datetime.month,
+                day=parsed_datetime.day,
+                hour=parsed_datetime.hour,
+                minute=parsed_datetime.minute,
+                second=parsed_datetime.second,
+                microsecond=parsed_datetime.microsecond,
+                tzinfo=parsed_datetime.tzinfo
+            )
+
+        else:
+            raise TypeError(f"Cannot create {cls} from {source!r}")
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({str(self)!r})"
+
+    def __str__(self) -> str:
+        return self.strftime("%Y-%m-%dT%H:%M:%SZ")
+
+    def __add__(self, other: Any) -> 'TimestampType':
+        """Timestamp + Duration -> Timestamp"""
+        result = super().__add__(other)
+        if result == NotImplemented:
+            return NotImplemented
+        return TimestampType(result)
+
+    def __radd__(self, other: Any) -> 'TimestampType':
+        """Duration + Timestamp -> Timestamp"""
+        result = super().__radd__(other)
+        if result == NotImplemented:
+            return NotImplemented
+        return TimestampType(result)
+
+    # For more information, check the typeshed definition
+    # https://github.com/python/typeshed/blob/master/stdlib/2and3/datetime.pyi
+
+    @overload  # type: ignore
+    def __sub__(self, other: 'TimestampType') -> 'DurationType':
+        ...  # pragma: no cover
+
+    @overload
+    def __sub__(self, other: 'DurationType') -> 'TimestampType':
+        ...  # pragma: no cover
+
+    def __sub__(
+            self,
+            other: Union['TimestampType', 'DurationType']
+    ) -> Union['TimestampType', 'DurationType']:
+        result = super().__sub__(other)
+        if result == NotImplemented:
+            return cast(DurationType, result)
+        if isinstance(result, datetime.timedelta):
+            return DurationType(result)
+        return TimestampType(result)
+
+    @classmethod
+    def tz_name_lookup(cls, tz_name: str) -> Optional[datetime.tzinfo]:
+        """
+        The :py:func:`dateutil.tz.gettz` may be extended with additional aliases.
+
+        ..  TODO: Permit an extension into the timezone lookup.
+            Tweak ``celpy.celtypes.TimestampType.TZ_ALIASES``.
+        """
+        tz_lookup = str(tz_name)
+        if tz_lookup in cls.TZ_ALIASES:
+            tz = dateutil.tz.gettz(cls.TZ_ALIASES[tz_lookup])
+        else:
+            tz = dateutil.tz.gettz(tz_lookup)
+        return tz
+
+    @classmethod
+    def tz_offset_parse(cls, tz_name: str) -> Optional[datetime.tzinfo]:
+        tz_pat = re.compile(r"^([+-]?)(\d\d?):(\d\d)$")
+        tz_match = tz_pat.match(tz_name)
+        if not tz_match:
+            raise ValueError(f"Unparsable timezone: {tz_name!r}")
+        sign, hh, mm = tz_match.groups()
+        offset_min = (int(hh) * 60 + int(mm)) * (-1 if sign == '-' else +1)
+        offset = datetime.timedelta(seconds=offset_min * 60)
+        tz = datetime.timezone(offset)
+        return tz
+
+    @staticmethod
+    def tz_parse(tz_name: Optional[str]) -> Optional[datetime.tzinfo]:
+        if tz_name:
+            tz = TimestampType.tz_name_lookup(tz_name)
+            if tz is None:
+                tz = TimestampType.tz_offset_parse(tz_name)
+            return tz
+        else:
+            return dateutil.tz.UTC
+
+    def getDate(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).day)
+
+    def getDayOfMonth(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).day - 1)
+
+    def getDayOfWeek(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).isoweekday() % 7)
+
+    def getDayOfYear(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        working_date = self.astimezone(new_tz)
+        jan1 = datetime.datetime(working_date.year, 1, 1, tzinfo=new_tz)
+        days = working_date.toordinal() - jan1.toordinal()
+        return IntType(days)
+
+    def getMonth(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).month - 1)
+
+    def getFullYear(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).year)
+
+    def getHours(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).hour)
+
+    def getMilliseconds(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).microsecond // 1000)
+
+    def getMinutes(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).minute)
+
+    def getSeconds(self, tz_name: Optional[StringType] = None) -> IntType:
+        new_tz = self.tz_parse(tz_name)
+        return IntType(self.astimezone(new_tz).second)
+
+
+class DurationType(datetime.timedelta):
+    """
+    Implements google.protobuf.Duration
+
+    https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration
+
+    The protobuf implementation is an ordered pair of int64 seconds and int32 nanos.
+    Instead of a Tuple[int, int] we use a wrapper for :py:class:`datetime.timedelta`.
+
+    The definition once said this::
+
+        "type conversion, duration should be end with "s", which stands for seconds"
+
+    This is obsolete, however, considering the following issue.
+
+    See https://github.com/google/cel-spec/issues/138
+
+    This refers to the following implementation detail
+    ::
+
+        // A duration string is a possibly signed sequence of
+        // decimal numbers, each with optional fraction and a unit suffix,
+        // such as "300ms", "-1.5h" or "2h45m".
+        // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+
+    The real regex, then is this::
+
+        [-+]?([0-9]*(\\.[0-9]*)?[a-z]+)+
+
+    """
+    MaxSeconds = 315576000000
+    MinSeconds = -315576000000
+    NanosecondsPerSecond = 1000000000
+
+    scale: Dict[str, float] = {
+        "ns": 1E-9,
+        "us": 1E-6,
+        "µs": 1E-6,
+        "ms": 1E-3,
+        "s": 1.,
+        "m": 60.,
+        "h": 60. * 60.,
+        "d": 24. * 60. * 60.,
+    }
+
+    def __new__(
+            cls: Type['DurationType'],
+            seconds: Any,
+            nanos: int = 0,
+            **kwargs: Any) -> 'DurationType':
+        if isinstance(seconds, datetime.timedelta):
+            if not (cls.MinSeconds <= seconds.total_seconds() <= cls.MaxSeconds):
+                raise ValueError("range error: {seconds}")
+            return super().__new__(
+                cls, days=seconds.days, seconds=seconds.seconds, microseconds=seconds.microseconds)
+        elif isinstance(seconds, int):
+            if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
+                raise ValueError("range error: {seconds}")
+            return super().__new__(
+                cls, seconds=seconds, microseconds=nanos // 1000)
+        elif isinstance(seconds, str):
+            duration_pat = re.compile(r"^[-+]?([0-9]*(\.[0-9]*)?[a-z]+)+$")
+
+            duration_match = duration_pat.match(seconds)
+            if not duration_match:
+                raise ValueError(f"Invalid duration {seconds!r}")
+
+            # Consume the sign.
+            sign: float
+            if seconds.startswith("+"):
+                seconds = seconds[1:]
+                sign = +1
+            elif seconds.startswith("-"):
+                seconds = seconds[1:]
+                sign = -1
+            else:
+                sign = +1
+
+            # Sum the remaining time components: number * unit
+            try:
+                seconds = sign * fsum(
+                    map(
+                        lambda n_u: float(n_u.group(1)) * cls.scale[n_u.group(3)],
+                        re.finditer(r"([0-9]*(\.[0-9]*)?)([a-z]+)", seconds)
+                    )
+                )
+            except KeyError:
+                raise ValueError(f"Invalid duration {seconds!r}")
+
+            if not (cls.MinSeconds <= seconds <= cls.MaxSeconds):
+                raise ValueError("range error: {seconds}")
+            return super().__new__(
+                cls, seconds=seconds)
+        else:
+            raise TypeError(f"Invalid initial value type: {type(seconds)}")
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}({str(self)!r})"
+
+    def __str__(self) -> str:
+        return "{0}s".format(int(self.total_seconds()))
+
+    def __add__(self, other: Any) -> 'DurationType':
+        """
+        This doesn't need to handle the rich variety of TimestampType overloadds.
+        This class only needs to handle results of duration + duration.
+        A duration + timestamp is not implemented by the timedelta superclass;
+        it is handled by the datetime superclass that implementes timestamp + duration.
+        """
+        result = super().__add__(other)
+        if result == NotImplemented:
+            return cast(DurationType, result)
+        # This is handled by TimestampType; this is here for completeness, but isn't used.
+        if isinstance(result, (datetime.datetime, TimestampType)):
+            return TimestampType(result)  # pragma: no cover
+        return DurationType(result)
+
+    def __radd__(self, other: Any) -> 'DurationType':  # pragma: no cover
+        """
+        This doesn't need to handle the rich variety of TimestampType overloadds.
+
+        Most cases are handled by TimeStamp.
+        """
+        result = super().__radd__(other)
+        if result == NotImplemented:
+            return cast(DurationType, result)
+        # This is handled by TimestampType; this is here for completeness, but isn't used.
+        if isinstance(result, (datetime.datetime, TimestampType)):
+            return TimestampType(result)
+        return DurationType(result)
+
+    def getHours(self, tz_name: Optional[str] = None) -> IntType:
+        assert tz_name is None
+        return IntType(int(self.total_seconds() / 60 / 60))
+
+    def getMilliseconds(self, tz_name: Optional[str] = None) -> IntType:
+        assert tz_name is None
+        return IntType(int(self.total_seconds() * 1000))
+
+    def getMinutes(self, tz_name: Optional[str] = None) -> IntType:
+        assert tz_name is None
+        return IntType(int(self.total_seconds() / 60))
+
+    def getSeconds(self, tz_name: Optional[str] = None) -> IntType:
+        assert tz_name is None
+        return IntType(int(self.total_seconds()))
+
+
+class FunctionType:
+    """
+    We need a concrete Annotation object to describe callables to celpy.
+    We need to describe functions as well as callable objects.
+    The description would tend to shadow ``typing.Callable``.
+
+    An ``__isinstance__()`` method, for example, may be helpful for run-time type-checking.
+
+    Superclass for CEL extension functions that are defined at run-time.
+    This permits a formal annotation in the environment construction that creates
+    an intended type for a given name.
+
+    This allows for some run-time type checking to see if the actual object binding
+    matches the declared type binding.
+
+    Also used to define protobuf classes provided as an annotation.
+
+    We *could* define this as three overloads to cover unary, binary, and tertiary cases.
+    """
+    def __call__(self, *args: Value, **kwargs: Value) -> Value:
+        raise NotImplementedError
+
+
+class PackageType(MapType):
+    """
+    A package of message types, usually protobuf.
+
+    TODO: This may not be needed.
+    """
+    pass
+
+
+class MessageType(MapType):
+    """
+    An individual protobuf message definition. A mapping from field name to field value.
+
+    See Scenario: "message_literal" in the parse.feature. This is a very deeply-nested
+    message (30? levels), but the navigation to "payload" field seems to create a default
+    value at the top level.
+    """
+    def __init__(self, *args: Value, **fields: Value) -> None:
+        if args and len(args) == 1:
+            super().__init__(cast(Mapping[Value, Value], args[0]))
+        elif args and len(args) > 1:
+            raise TypeError(r"Expected dictionary or fields, not {args!r}")
+        else:
+            super().__init__({StringType(k): v for k, v in fields.items()})
+
+    # def get(self, field: Any, default: Optional[Value] = None) -> Value:
+    #     """
+    #     Alternative implementation with descent to locate a deeply-buried field.
+    #     It seemed like this was the defined behavior. It turns it, it isn't.
+    #     The code is here in case we're wrong and it really is the defined behavior.
+    #
+    #     Note. There is no default provision in CEL.
+    #     """
+    #     if field in self:
+    #         return super().get(field)
+    #
+    #     def descend(message: MessageType, field: Value) -> MessageType:
+    #         if field in message:
+    #             return message
+    #         for k in message.keys():
+    #             found = descend(message[k], field)
+    #             if found is not None:
+    #                 return found
+    #         return None
+    #
+    #     sub_message = descend(self, field)
+    #     if sub_message is None:
+    #         return default
+    #     return sub_message.get(field)
+
+
+class TypeType:
+    """
+    Annotation used to mark protobuf type objects.
+    We map these to CELTypes so that type name testing works.
+    """
+    type_name_mapping = {
+        "google.protobuf.Duration": DurationType,
+        "google.protobuf.Timestamp": TimestampType,
+        "google.protobuf.Int32Value": IntType,
+        "google.protobuf.Int64Value": IntType,
+        "google.protobuf.UInt32Value": UintType,
+        "google.protobuf.UInt64Value": UintType,
+        "google.protobuf.FloatValue": DoubleType,
+        "google.protobuf.DoubleValue": DoubleType,
+        "google.protobuf.Value": MessageType,
+        "google.protubuf.Any": MessageType,  # Weird.
+        "google.protobuf.Any": MessageType,
+        "list_type": ListType,
+        "map_type": MapType,
+        "map": MapType,
+        "list": ListType,
+        "string": StringType,
+        "bytes": BytesType,
+        "bool": BoolType,
+        "int": IntType,
+        "uint": UintType,
+        "double": DoubleType,
+        "null_type": type(None),
+        "STRING": StringType,
+        "BOOL": BoolType,
+        "INT64": IntType,
+        "UINT64": UintType,
+        "INT32": IntType,
+        "UINT32": UintType,
+        "BYTES": BytesType,
+        "DOUBLE": DoubleType,
+    }
+
+    def __init__(
+            self,
+            value: Any = "") -> None:
+        if isinstance(value, str) and value in self.type_name_mapping:
+            self.type_reference = self.type_name_mapping[value]
+        elif isinstance(value, str):
+            try:
+                self.type_reference = eval(value)
+            except (NameError, SyntaxError):
+                raise TypeError(f"Unknown type {value!r}")
+        else:
+            self.type_reference = value.__class__
+
+    def __eq__(self, other: Any) -> bool:
+        return (
+            other == self.type_reference
+            or isinstance(other, self.type_reference)  # noqa: W503
+        )