diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/deepdiff/helper.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff/helper.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/deepdiff/helper.py | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/helper.py b/.venv/lib/python3.12/site-packages/deepdiff/helper.py new file mode 100644 index 00000000..63a4e315 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/helper.py @@ -0,0 +1,837 @@ +import sys +import re +import os +import datetime +import uuid +import logging +import warnings +import string +import time +import enum +import ipaddress +from typing import NamedTuple, Any, List, Optional, Dict, Union, TYPE_CHECKING, Tuple +from ast import literal_eval +from decimal import Decimal, localcontext, InvalidOperation as InvalidDecimalOperation +from itertools import repeat +from orderly_set import StableSetEq as SetOrderedBase # median: 1.0867 s for cache test, 5.63s for all tests +from threading import Timer + +if TYPE_CHECKING: + from pytz.tzinfo import BaseTzInfo + + +class np_type: + pass + + +class pydantic_base_model_type: + pass + + +class SetOrdered(SetOrderedBase): + def __repr__(self): + return str(list(self)) + + +try: + import numpy as np +except ImportError: # pragma: no cover. The case without Numpy is tested locally only. + np = None # pragma: no cover. + np_array_factory = 'numpy not available' # pragma: no cover. + np_ndarray = np_type # pragma: no cover. + np_bool_ = np_type # pragma: no cover. + np_int8 = np_type # pragma: no cover. + np_int16 = np_type # pragma: no cover. + np_int32 = np_type # pragma: no cover. + np_int64 = np_type # pragma: no cover. + np_uint8 = np_type # pragma: no cover. + np_uint16 = np_type # pragma: no cover. + np_uint32 = np_type # pragma: no cover. + np_uint64 = np_type # pragma: no cover. + np_intp = np_type # pragma: no cover. + np_uintp = np_type # pragma: no cover. + np_float32 = np_type # pragma: no cover. + np_float64 = np_type # pragma: no cover. + np_double = np_type # pragma: no cover. + np_floating = np_type # pragma: no cover. + np_complex64 = np_type # pragma: no cover. + np_complex128 = np_type # pragma: no cover. + np_cdouble = np_type # pragma: no cover. + np_complexfloating = np_type # pragma: no cover. +else: + np_array_factory = np.array + np_ndarray = np.ndarray + np_bool_ = np.bool_ + np_int8 = np.int8 + np_int16 = np.int16 + np_int32 = np.int32 + np_int64 = np.int64 + np_uint8 = np.uint8 + np_uint16 = np.uint16 + np_uint32 = np.uint32 + np_uint64 = np.uint64 + np_intp = np.intp + np_uintp = np.uintp + np_float32 = np.float32 + np_float64 = np.float64 + np_double = np.double # np.float_ is an alias for np.double and is being removed by NumPy 2.0 + np_floating = np.floating + np_complex64 = np.complex64 + np_complex128 = np.complex128 + np_cdouble = np.cdouble # np.complex_ is an alias for np.cdouble and is being removed by NumPy 2.0 + np_complexfloating = np.complexfloating + +numpy_numbers = ( + np_int8, np_int16, np_int32, np_int64, np_uint8, + np_uint16, np_uint32, np_uint64, np_intp, np_uintp, + np_float32, np_float64, np_double, np_floating, np_complex64, + np_complex128, np_cdouble,) + +numpy_complex_numbers = ( + np_complexfloating, np_complex64, np_complex128, np_cdouble, +) + +numpy_dtypes = set(numpy_numbers) +numpy_dtypes.add(np_bool_) # type: ignore + +numpy_dtype_str_to_type = { + item.__name__: item for item in numpy_dtypes +} + +try: + from pydantic.main import BaseModel as PydanticBaseModel # type: ignore +except ImportError: + PydanticBaseModel = pydantic_base_model_type + + +logger = logging.getLogger(__name__) + +py_major_version = sys.version_info.major +py_minor_version = sys.version_info.minor + +py_current_version = Decimal("{}.{}".format(py_major_version, py_minor_version)) + +py2 = py_major_version == 2 +py3 = py_major_version == 3 +py4 = py_major_version == 4 + + +NUMERICS = frozenset(string.digits) + + +class EnumBase(str, enum.Enum): + def __repr__(self): + """ + We need to add a single quotes so we can easily copy the value when we do ipdb. + """ + return f"'{self.name}'" + + def __str__(self): + return self.name + + +def _int_or_zero(value): + """ + Tries to extract some number from a string. + + 12c becomes 12 + """ + try: + return int(value) + except Exception: + result = [] + for char in value: + if char in NUMERICS: + result.append(char) + if result: + return int(''.join(result)) + return 0 + + +def get_semvar_as_integer(version): + """ + Converts: + + '1.23.5' to 1023005 + """ + version = version.split('.') + if len(version) > 3: + version = version[:3] + elif len(version) < 3: + version.extend(['0'] * (3 - len(version))) + + return sum([10**(i * 3) * _int_or_zero(v) for i, v in enumerate(reversed(version))]) + + +# we used to use OrderedDictPlus when dictionaries in Python were not ordered. +dict_ = dict + +if py4: + logger.warning('Python 4 is not supported yet. Switching logic to Python 3.') # pragma: no cover + py3 = True # pragma: no cover + +if py2: # pragma: no cover + sys.exit('Python 2 is not supported anymore. The last version of DeepDiff that supported Py2 was 3.3.0') + +pypy3 = py3 and hasattr(sys, "pypy_translation_info") + + +if np and get_semvar_as_integer(np.__version__) < 1019000: + sys.exit('The minimum required Numpy version is 1.19.0. Please upgrade your Numpy package.') + +strings = (str, bytes) # which are both basestring +unicode_type = str +bytes_type = bytes +only_complex_number = (complex,) + numpy_complex_numbers +only_numbers = (int, float, complex, Decimal) + numpy_numbers +datetimes = (datetime.datetime, datetime.date, datetime.timedelta, datetime.time) +ipranges = (ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network) +uuids = (uuid.UUID, ) +times = (datetime.datetime, datetime.time) +numbers: Tuple = only_numbers + datetimes +booleans = (bool, np_bool_) + +basic_types = strings + numbers + uuids + booleans + (type(None), ) + +class IndexedHash(NamedTuple): + indexes: List + item: Any + +current_dir = os.path.dirname(os.path.abspath(__file__)) + +ID_PREFIX = '!>*id' + +KEY_TO_VAL_STR = "{}:{}" + +TREE_VIEW = 'tree' +TEXT_VIEW = 'text' +DELTA_VIEW = '_delta' + +ENUM_INCLUDE_KEYS = ['__objclass__', 'name', 'value'] + + +def short_repr(item, max_length=15): + """Short representation of item if it is too long""" + item = repr(item) + if len(item) > max_length: + item = '{}...{}'.format(item[:max_length - 3], item[-1]) + return item + + +class ListItemRemovedOrAdded: # pragma: no cover + """Class of conditions to be checked""" + pass + + +class OtherTypes: + def __repr__(self): + return "Error: {}".format(self.__class__.__name__) # pragma: no cover + + __str__ = __repr__ + + +class Skipped(OtherTypes): + pass + + +class Unprocessed(OtherTypes): + pass + + +class NotHashed(OtherTypes): + pass + + +class NotPresent: # pragma: no cover + """ + In a change tree, this indicated that a previously existing object has been removed -- or will only be added + in the future. + We previously used None for this but this caused problem when users actually added and removed None. Srsly guys? :D + """ + + def __repr__(self): + return 'not present' # pragma: no cover + + __str__ = __repr__ + + +class CannotCompare(Exception): + """ + Exception when two items cannot be compared in the compare function. + """ + pass + + +unprocessed = Unprocessed() +skipped = Skipped() +not_hashed = NotHashed() +notpresent = NotPresent() + +# Disabling remapping from old to new keys since the mapping is deprecated. +RemapDict = dict_ + + +# class RemapDict(dict_): +# """ +# DISABLED +# Remap Dictionary. + +# For keys that have a new, longer name, remap the old key to the new key. +# Other keys that don't have a new name are handled as before. +# """ + +# def __getitem__(self, old_key): +# new_key = EXPANDED_KEY_MAP.get(old_key, old_key) +# if new_key != old_key: +# logger.warning( +# "DeepDiff Deprecation: %s is renamed to %s. Please start using " +# "the new unified naming convention.", old_key, new_key) +# if new_key in self: +# return self.get(new_key) +# else: # pragma: no cover +# raise KeyError(new_key) + + +class indexed_set(set): + """ + A set class that lets you get an item by index + + >>> a = indexed_set() + >>> a.add(10) + >>> a.add(20) + >>> a[0] + 10 + """ + + +def add_to_frozen_set(parents_ids, item_id): + return parents_ids | {item_id} + + +def convert_item_or_items_into_set_else_none(items): + if items: + if isinstance(items, strings): + items = {items} + else: + items = set(items) + else: + items = None + return items + + +def add_root_to_paths(paths): + """ + Sometimes the users want to just pass + [key] instead of root[key] for example. + Here we automatically add all sorts of variations that might match + the path they were supposed to pass. + """ + if paths is None: + return + result = SetOrdered() + for path in paths: + if path.startswith('root'): + result.add(path) + else: + if path.isdigit(): + result.add(f"root['{path}']") + result.add(f"root[{path}]") + elif path[0].isdigit(): + result.add(f"root['{path}']") + else: + result.add(f"root.{path}") + result.add(f"root['{path}']") + return result + + +RE_COMPILED_TYPE = type(re.compile('')) + + +def convert_item_or_items_into_compiled_regexes_else_none(items): + if items: + if isinstance(items, (strings, RE_COMPILED_TYPE)): + items = [items] + items = [i if isinstance(i, RE_COMPILED_TYPE) else re.compile(i) for i in items] + else: + items = None + return items + + +def get_id(obj): + """ + Adding some characters to id so they are not just integers to reduce the risk of collision. + """ + return "{}{}".format(ID_PREFIX, id(obj)) + + +def get_type(obj): + """ + Get the type of object or if it is a class, return the class itself. + """ + if isinstance(obj, np_ndarray): + return obj.dtype.type # type: ignore + return obj if type(obj) is type else type(obj) + + +def numpy_dtype_string_to_type(dtype_str): + return numpy_dtype_str_to_type[dtype_str] + + +def type_in_type_group(item, type_group): + return get_type(item) in type_group + + +def type_is_subclass_of_type_group(item, type_group): + return isinstance(item, type_group) \ + or (isinstance(item, type) and issubclass(item, type_group)) \ + or type_in_type_group(item, type_group) + + +def get_doc(doc_filename): + try: + with open(os.path.join(current_dir, '../docs/', doc_filename), 'r') as doc_file: + doc = doc_file.read() + except Exception: # pragma: no cover + doc = 'Failed to load the docstrings. Please visit: https://zepworks.com/deepdiff/current/' # pragma: no cover + return doc + + +number_formatting = { + "f": r'{:.%sf}', + "e": r'{:.%se}', +} + + +def number_to_string(number, significant_digits, number_format_notation="f"): + """ + Convert numbers to string considering significant digits. + """ + try: + using = number_formatting[number_format_notation] + except KeyError: + raise ValueError("number_format_notation got invalid value of {}. The valid values are 'f' and 'e'".format(number_format_notation)) from None + + if not isinstance(number, numbers): # type: ignore + return number + elif isinstance(number, Decimal): + with localcontext() as ctx: + # Precision = number of integer digits + significant_digits + # Using number//1 to get the integer part of the number + ctx.prec = len(str(abs(number // 1))) + significant_digits + try: + number = number.quantize(Decimal('0.' + '0' * significant_digits)) + except InvalidDecimalOperation: + # Sometimes rounding up causes a higher precision to be needed for the quantize operation + # For example '999.99999999' will become '1000.000000' after quantize + ctx.prec += 1 + number = number.quantize(Decimal('0.' + '0' * significant_digits)) + elif isinstance(number, only_complex_number): # type: ignore + # Case for complex numbers. + number = number.__class__( + "{real}+{imag}j".format( # type: ignore + real=number_to_string( + number=number.real, # type: ignore + significant_digits=significant_digits, + number_format_notation=number_format_notation + ), + imag=number_to_string( + number=number.imag, # type: ignore + significant_digits=significant_digits, + number_format_notation=number_format_notation + ) + ) # type: ignore + ) + else: + number = round(number=number, ndigits=significant_digits) # type: ignore + + if significant_digits == 0: + number = int(number) + + if number == 0.0: + # Special case for 0: "-0.xx" should compare equal to "0.xx" + number = abs(number) # type: ignore + + # Cast number to string + result = (using % significant_digits).format(number) + # https://bugs.python.org/issue36622 + if number_format_notation == 'e': + # Removing leading 0 for exponential part. + result = re.sub( + pattern=r'(?<=e(\+|\-))0(?=\d)+', + repl=r'', + string=result + ) + return result + + +class DeepDiffDeprecationWarning(DeprecationWarning): + """ + Use this warning instead of DeprecationWarning + """ + pass + + +def cartesian_product(a, b): + """ + Get the Cartesian product of two iterables + + **parameters** + + a: list of lists + b: iterable to do the Cartesian product + """ + + for i in a: + for j in b: + yield i + (j,) + + +def cartesian_product_of_shape(dimentions, result=None): + """ + Cartesian product of a dimentions iterable. + This is mainly used to traverse Numpy ndarrays. + + Each array has dimentions that are defines in ndarray.shape + """ + if result is None: + result = ((),) # a tuple with an empty tuple + for dimension in dimentions: + result = cartesian_product(result, range(dimension)) + return result + + +def get_numpy_ndarray_rows(obj, shape=None): + """ + Convert a multi dimensional numpy array to list of rows + """ + if shape is None: + shape = obj.shape + + dimentions = shape[:-1] + for path_tuple in cartesian_product_of_shape(dimentions): + result = obj + for index in path_tuple: + result = result[index] + yield path_tuple, result + + +class _NotFound: + + def __eq__(self, other): + return False + + __req__ = __eq__ + + def __repr__(self): + return 'not found' + + __str__ = __repr__ + + +not_found = _NotFound() + +warnings.simplefilter('once', DeepDiffDeprecationWarning) + + +class RepeatedTimer: + """ + Threaded Repeated Timer by MestreLion + https://stackoverflow.com/a/38317060/1497443 + """ + + def __init__(self, interval, function, *args, **kwargs): + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.start_time = time.time() + self.kwargs = kwargs + self.is_running = False + self.start() + + def _get_duration_sec(self): + return int(time.time() - self.start_time) + + def _run(self): + self.is_running = False + self.start() + self.function(*self.args, **self.kwargs) + + def start(self): + self.kwargs.update(duration=self._get_duration_sec()) + if not self.is_running: + self._timer = Timer(self.interval, self._run) + self._timer.start() + self.is_running = True + + def stop(self): + duration = self._get_duration_sec() + if self._timer is not None: + self._timer.cancel() + self.is_running = False + return duration + + +def _eval_decimal(params): + return Decimal(params) + + +def _eval_datetime(params): + params = f'({params})' + params = literal_eval(params) + return datetime.datetime(*params) + + +def _eval_date(params): + params = f'({params})' + params = literal_eval(params) + return datetime.date(*params) + + +LITERAL_EVAL_PRE_PROCESS = [ + ('Decimal(', ')', _eval_decimal), + ('datetime.datetime(', ')', _eval_datetime), + ('datetime.date(', ')', _eval_date), +] + + +def literal_eval_extended(item): + """ + An extended version of literal_eval + """ + try: + return literal_eval(item) + except (SyntaxError, ValueError): + for begin, end, func in LITERAL_EVAL_PRE_PROCESS: + if item.startswith(begin) and item.endswith(end): + # Extracting and removing extra quotes so for example "Decimal('10.1')" becomes "'10.1'" and then '10.1' + params = item[len(begin): -len(end)].strip('\'\"') + return func(params) + raise + + +def time_to_seconds(t:datetime.time) -> int: + return (t.hour * 60 + t.minute) * 60 + t.second + + +def datetime_normalize( + truncate_datetime:Union[str, None], + obj:Union[datetime.datetime, datetime.time], + default_timezone: Union[ + datetime.timezone, "BaseTzInfo" + ] = datetime.timezone.utc, +) -> Any: + if truncate_datetime: + if truncate_datetime == 'second': + obj = obj.replace(microsecond=0) + elif truncate_datetime == 'minute': + obj = obj.replace(second=0, microsecond=0) + elif truncate_datetime == 'hour': + obj = obj.replace(minute=0, second=0, microsecond=0) + elif truncate_datetime == 'day': + obj = obj.replace(hour=0, minute=0, second=0, microsecond=0) + if isinstance(obj, datetime.datetime): + if has_timezone(obj): + obj = obj.astimezone(default_timezone) + else: + obj = obj.replace(tzinfo=default_timezone) + elif isinstance(obj, datetime.time): + return time_to_seconds(obj) + return obj + + +def has_timezone(dt): + """ + Function to check if a datetime object has a timezone + + Checking dt.tzinfo.utcoffset(dt) ensures that the datetime object is truly timezone-aware + because some datetime objects may have a tzinfo attribute that is not None but still + doesn't provide a valid offset. + + Certain tzinfo objects, such as pytz.timezone(None), can exist but do not provide meaningful UTC offset information. + If tzinfo is present but calling .utcoffset(dt) returns None, the datetime is not truly timezone-aware. + """ + return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None + + +def get_truncate_datetime(truncate_datetime) -> Union[str, None]: + """ + Validates truncate_datetime value + """ + if truncate_datetime not in {None, 'second', 'minute', 'hour', 'day'}: + raise ValueError("truncate_datetime must be second, minute, hour or day") + return truncate_datetime + + +def cartesian_product_numpy(*arrays): + """ + Cartesian product of Numpy arrays by Paul Panzer + https://stackoverflow.com/a/49445693/1497443 + """ + la = len(arrays) + dtype = np.result_type(*arrays) # type: ignore + arr = np.empty((la, *map(len, arrays)), dtype=dtype) # type: ignore + idx = slice(None), *repeat(None, la) + for i, a in enumerate(arrays): + arr[i, ...] = a[idx[:la - i]] + return arr.reshape(la, -1).T + + +def diff_numpy_array(A, B): + """ + Numpy Array A - B + return items in A that are not in B + By Divakar + https://stackoverflow.com/a/52417967/1497443 + """ + return A[~np.isin(A, B)] # type: ignore + + +PYTHON_TYPE_TO_NUMPY_TYPE = { + int: np_int64, + float: np_float64, + Decimal: np_float64 +} + + +def get_homogeneous_numpy_compatible_type_of_seq(seq): + """ + Return with the numpy dtype if the array can be converted to a non-object numpy array. + Originally written by mgilson https://stackoverflow.com/a/13252348/1497443 + This is the modified version. + """ + iseq = iter(seq) + first_type = type(next(iseq)) + if first_type in {int, float, Decimal}: + type_ = first_type if all((type(x) is first_type) for x in iseq) else False + return PYTHON_TYPE_TO_NUMPY_TYPE.get(type_, False) + else: + return False + + +def detailed__dict__(obj, ignore_private_variables=True, ignore_keys=frozenset(), include_keys=None): + """ + Get the detailed dictionary of an object. + + This is used so we retrieve object properties too. + """ + if include_keys: + result = {} + for key in include_keys: + try: + value = getattr(obj, key) + except Exception: + pass + else: + if not callable(value) or key == '__objclass__': # We don't want to compare functions, however for backward compatibility, __objclass__ needs to be reported. + result[key] = value + else: + result = obj.__dict__.copy() # A shallow copy + private_var_prefix = f"_{obj.__class__.__name__}__" # The semi private variables in Python get this prefix + for key in ignore_keys: + if key in result or ( + ignore_private_variables and key.startswith('__') and not key.startswith(private_var_prefix) + ): + del result[key] + for key in dir(obj): + if key not in result and key not in ignore_keys and ( + not ignore_private_variables or ( + ignore_private_variables and not key.startswith('__') and not key.startswith(private_var_prefix) + ) + ): + value = getattr(obj, key) + if not callable(value): + result[key] = value + return result + + +def named_tuple_repr(self): + fields = [] + for field, value in self._asdict().items(): + # Only include fields that do not have their default value + if field in self._field_defaults: + if value != self._field_defaults[field]: + fields.append(f"{field}={value!r}") + else: + fields.append(f"{field}={value!r}") + + return f"{self.__class__.__name__}({', '.join(fields)})" + + +class OpcodeTag(EnumBase): + insert = 'insert' + delete = 'delete' + equal = 'equal' + replace = 'replace' # type: ignore + # swapped = 'swapped' # in the future we should support reporting of items swapped with each other + + +class Opcode(NamedTuple): + tag: str + t1_from_index: int + t1_to_index: int + t2_from_index: int + t2_to_index: int + old_values: Optional[List[Any]] = None + new_values: Optional[List[Any]] = None + + __repr__ = __str__ = named_tuple_repr + + +class FlatDataAction(EnumBase): + values_changed = 'values_changed' + type_changes = 'type_changes' + set_item_added = 'set_item_added' + set_item_removed = 'set_item_removed' + dictionary_item_added = 'dictionary_item_added' + dictionary_item_removed = 'dictionary_item_removed' + iterable_item_added = 'iterable_item_added' + iterable_item_removed = 'iterable_item_removed' + iterable_item_moved = 'iterable_item_moved' + iterable_items_inserted = 'iterable_items_inserted' # opcode + iterable_items_deleted = 'iterable_items_deleted' # opcode + iterable_items_replaced = 'iterable_items_replaced' # opcode + iterable_items_equal = 'iterable_items_equal' # opcode + attribute_removed = 'attribute_removed' + attribute_added = 'attribute_added' + unordered_iterable_item_added = 'unordered_iterable_item_added' + unordered_iterable_item_removed = 'unordered_iterable_item_removed' + initiated = "initiated" + + +OPCODE_TAG_TO_FLAT_DATA_ACTION = { + OpcodeTag.insert: FlatDataAction.iterable_items_inserted, + OpcodeTag.delete: FlatDataAction.iterable_items_deleted, + OpcodeTag.replace: FlatDataAction.iterable_items_replaced, + OpcodeTag.equal: FlatDataAction.iterable_items_equal, +} + +FLAT_DATA_ACTION_TO_OPCODE_TAG = {v: i for i, v in OPCODE_TAG_TO_FLAT_DATA_ACTION.items()} + + +UnkownValueCode: str = 'unknown___' + + +class FlatDeltaRow(NamedTuple): + path: List + action: FlatDataAction + value: Optional[Any] = UnkownValueCode + old_value: Optional[Any] = UnkownValueCode + type: Optional[Any] = UnkownValueCode + old_type: Optional[Any] = UnkownValueCode + new_path: Optional[List] = None + t1_from_index: Optional[int] = None + t1_to_index: Optional[int] = None + t2_from_index: Optional[int] = None + t2_to_index: Optional[int] = None + + __repr__ = __str__ = named_tuple_repr + + +JSON = Union[Dict[str, str], List[str], List[int], Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] + + +class SummaryNodeType(EnumBase): + dict = 'dict' + list = 'list' + leaf = 'leaf' |