diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/jmespath/functions.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/jmespath/functions.py | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/jmespath/functions.py b/.venv/lib/python3.12/site-packages/jmespath/functions.py new file mode 100644 index 00000000..11ab56ac --- /dev/null +++ b/.venv/lib/python3.12/site-packages/jmespath/functions.py @@ -0,0 +1,362 @@ +import math +import json + +from jmespath import exceptions +from jmespath.compat import string_type as STRING_TYPE +from jmespath.compat import get_methods + + +# python types -> jmespath types +TYPES_MAP = { + 'bool': 'boolean', + 'list': 'array', + 'dict': 'object', + 'NoneType': 'null', + 'unicode': 'string', + 'str': 'string', + 'float': 'number', + 'int': 'number', + 'long': 'number', + 'OrderedDict': 'object', + '_Projection': 'array', + '_Expression': 'expref', +} + + +# jmespath types -> python types +REVERSE_TYPES_MAP = { + 'boolean': ('bool',), + 'array': ('list', '_Projection'), + 'object': ('dict', 'OrderedDict',), + 'null': ('NoneType',), + 'string': ('unicode', 'str'), + 'number': ('float', 'int', 'long'), + 'expref': ('_Expression',), +} + + +def signature(*arguments): + def _record_signature(func): + func.signature = arguments + return func + return _record_signature + + +class FunctionRegistry(type): + def __init__(cls, name, bases, attrs): + cls._populate_function_table() + super(FunctionRegistry, cls).__init__(name, bases, attrs) + + def _populate_function_table(cls): + function_table = {} + # Any method with a @signature decorator that also + # starts with "_func_" is registered as a function. + # _func_max_by -> max_by function. + for name, method in get_methods(cls): + if not name.startswith('_func_'): + continue + signature = getattr(method, 'signature', None) + if signature is not None: + function_table[name[6:]] = { + 'function': method, + 'signature': signature, + } + cls.FUNCTION_TABLE = function_table + + +class Functions(metaclass=FunctionRegistry): + + FUNCTION_TABLE = { + } + + def call_function(self, function_name, resolved_args): + try: + spec = self.FUNCTION_TABLE[function_name] + except KeyError: + raise exceptions.UnknownFunctionError( + "Unknown function: %s()" % function_name) + function = spec['function'] + signature = spec['signature'] + self._validate_arguments(resolved_args, signature, function_name) + return function(self, *resolved_args) + + def _validate_arguments(self, args, signature, function_name): + if signature and signature[-1].get('variadic'): + if len(args) < len(signature): + raise exceptions.VariadictArityError( + len(signature), len(args), function_name) + elif len(args) != len(signature): + raise exceptions.ArityError( + len(signature), len(args), function_name) + return self._type_check(args, signature, function_name) + + def _type_check(self, actual, signature, function_name): + for i in range(len(signature)): + allowed_types = signature[i]['types'] + if allowed_types: + self._type_check_single(actual[i], allowed_types, + function_name) + + def _type_check_single(self, current, types, function_name): + # Type checking involves checking the top level type, + # and in the case of arrays, potentially checking the types + # of each element. + allowed_types, allowed_subtypes = self._get_allowed_pytypes(types) + # We're not using isinstance() on purpose. + # The type model for jmespath does not map + # 1-1 with python types (booleans are considered + # integers in python for example). + actual_typename = type(current).__name__ + if actual_typename not in allowed_types: + raise exceptions.JMESPathTypeError( + function_name, current, + self._convert_to_jmespath_type(actual_typename), types) + # If we're dealing with a list type, we can have + # additional restrictions on the type of the list + # elements (for example a function can require a + # list of numbers or a list of strings). + # Arrays are the only types that can have subtypes. + if allowed_subtypes: + self._subtype_check(current, allowed_subtypes, + types, function_name) + + def _get_allowed_pytypes(self, types): + allowed_types = [] + allowed_subtypes = [] + for t in types: + type_ = t.split('-', 1) + if len(type_) == 2: + type_, subtype = type_ + allowed_subtypes.append(REVERSE_TYPES_MAP[subtype]) + else: + type_ = type_[0] + allowed_types.extend(REVERSE_TYPES_MAP[type_]) + return allowed_types, allowed_subtypes + + def _subtype_check(self, current, allowed_subtypes, types, function_name): + if len(allowed_subtypes) == 1: + # The easy case, we know up front what type + # we need to validate. + allowed_subtypes = allowed_subtypes[0] + for element in current: + actual_typename = type(element).__name__ + if actual_typename not in allowed_subtypes: + raise exceptions.JMESPathTypeError( + function_name, element, actual_typename, types) + elif len(allowed_subtypes) > 1 and current: + # Dynamic type validation. Based on the first + # type we see, we validate that the remaining types + # match. + first = type(current[0]).__name__ + for subtypes in allowed_subtypes: + if first in subtypes: + allowed = subtypes + break + else: + raise exceptions.JMESPathTypeError( + function_name, current[0], first, types) + for element in current: + actual_typename = type(element).__name__ + if actual_typename not in allowed: + raise exceptions.JMESPathTypeError( + function_name, element, actual_typename, types) + + @signature({'types': ['number']}) + def _func_abs(self, arg): + return abs(arg) + + @signature({'types': ['array-number']}) + def _func_avg(self, arg): + if arg: + return sum(arg) / float(len(arg)) + else: + return None + + @signature({'types': [], 'variadic': True}) + def _func_not_null(self, *arguments): + for argument in arguments: + if argument is not None: + return argument + + @signature({'types': []}) + def _func_to_array(self, arg): + if isinstance(arg, list): + return arg + else: + return [arg] + + @signature({'types': []}) + def _func_to_string(self, arg): + if isinstance(arg, STRING_TYPE): + return arg + else: + return json.dumps(arg, separators=(',', ':'), + default=str) + + @signature({'types': []}) + def _func_to_number(self, arg): + if isinstance(arg, (list, dict, bool)): + return None + elif arg is None: + return None + elif isinstance(arg, (int, float)): + return arg + else: + try: + return int(arg) + except ValueError: + try: + return float(arg) + except ValueError: + return None + + @signature({'types': ['array', 'string']}, {'types': []}) + def _func_contains(self, subject, search): + return search in subject + + @signature({'types': ['string', 'array', 'object']}) + def _func_length(self, arg): + return len(arg) + + @signature({'types': ['string']}, {'types': ['string']}) + def _func_ends_with(self, search, suffix): + return search.endswith(suffix) + + @signature({'types': ['string']}, {'types': ['string']}) + def _func_starts_with(self, search, suffix): + return search.startswith(suffix) + + @signature({'types': ['array', 'string']}) + def _func_reverse(self, arg): + if isinstance(arg, STRING_TYPE): + return arg[::-1] + else: + return list(reversed(arg)) + + @signature({"types": ['number']}) + def _func_ceil(self, arg): + return math.ceil(arg) + + @signature({"types": ['number']}) + def _func_floor(self, arg): + return math.floor(arg) + + @signature({"types": ['string']}, {"types": ['array-string']}) + def _func_join(self, separator, array): + return separator.join(array) + + @signature({'types': ['expref']}, {'types': ['array']}) + def _func_map(self, expref, arg): + result = [] + for element in arg: + result.append(expref.visit(expref.expression, element)) + return result + + @signature({"types": ['array-number', 'array-string']}) + def _func_max(self, arg): + if arg: + return max(arg) + else: + return None + + @signature({"types": ["object"], "variadic": True}) + def _func_merge(self, *arguments): + merged = {} + for arg in arguments: + merged.update(arg) + return merged + + @signature({"types": ['array-number', 'array-string']}) + def _func_min(self, arg): + if arg: + return min(arg) + else: + return None + + @signature({"types": ['array-string', 'array-number']}) + def _func_sort(self, arg): + return list(sorted(arg)) + + @signature({"types": ['array-number']}) + def _func_sum(self, arg): + return sum(arg) + + @signature({"types": ['object']}) + def _func_keys(self, arg): + # To be consistent with .values() + # should we also return the indices of a list? + return list(arg.keys()) + + @signature({"types": ['object']}) + def _func_values(self, arg): + return list(arg.values()) + + @signature({'types': []}) + def _func_type(self, arg): + if isinstance(arg, STRING_TYPE): + return "string" + elif isinstance(arg, bool): + return "boolean" + elif isinstance(arg, list): + return "array" + elif isinstance(arg, dict): + return "object" + elif isinstance(arg, (float, int)): + return "number" + elif arg is None: + return "null" + + @signature({'types': ['array']}, {'types': ['expref']}) + def _func_sort_by(self, array, expref): + if not array: + return array + # sort_by allows for the expref to be either a number of + # a string, so we have some special logic to handle this. + # We evaluate the first array element and verify that it's + # either a string of a number. We then create a key function + # that validates that type, which requires that remaining array + # elements resolve to the same type as the first element. + required_type = self._convert_to_jmespath_type( + type(expref.visit(expref.expression, array[0])).__name__) + if required_type not in ['number', 'string']: + raise exceptions.JMESPathTypeError( + 'sort_by', array[0], required_type, ['string', 'number']) + keyfunc = self._create_key_func(expref, + [required_type], + 'sort_by') + return list(sorted(array, key=keyfunc)) + + @signature({'types': ['array']}, {'types': ['expref']}) + def _func_min_by(self, array, expref): + keyfunc = self._create_key_func(expref, + ['number', 'string'], + 'min_by') + if array: + return min(array, key=keyfunc) + else: + return None + + @signature({'types': ['array']}, {'types': ['expref']}) + def _func_max_by(self, array, expref): + keyfunc = self._create_key_func(expref, + ['number', 'string'], + 'max_by') + if array: + return max(array, key=keyfunc) + else: + return None + + def _create_key_func(self, expref, allowed_types, function_name): + def keyfunc(x): + result = expref.visit(expref.expression, x) + actual_typename = type(result).__name__ + jmespath_type = self._convert_to_jmespath_type(actual_typename) + # allowed_types is in term of jmespath types, not python types. + if jmespath_type not in allowed_types: + raise exceptions.JMESPathTypeError( + function_name, result, jmespath_type, allowed_types) + return result + return keyfunc + + def _convert_to_jmespath_type(self, pyobject): + return TYPES_MAP.get(pyobject, 'unknown') |