import sys import re import os import datetime import uuid import logging import warnings import string import time import enum import ipaddress from typing import NamedTuple, Any, List, Optional, Dict, Union, TYPE_CHECKING, Tuple from ast import literal_eval from decimal import Decimal, localcontext, InvalidOperation as InvalidDecimalOperation from itertools import repeat from orderly_set import StableSetEq as SetOrderedBase # median: 1.0867 s for cache test, 5.63s for all tests from threading import Timer if TYPE_CHECKING: from pytz.tzinfo import BaseTzInfo class np_type: pass class pydantic_base_model_type: pass class SetOrdered(SetOrderedBase): def __repr__(self): return str(list(self)) try: import numpy as np except ImportError: # pragma: no cover. The case without Numpy is tested locally only. np = None # pragma: no cover. np_array_factory = 'numpy not available' # pragma: no cover. np_ndarray = np_type # pragma: no cover. np_bool_ = np_type # pragma: no cover. np_int8 = np_type # pragma: no cover. np_int16 = np_type # pragma: no cover. np_int32 = np_type # pragma: no cover. np_int64 = np_type # pragma: no cover. np_uint8 = np_type # pragma: no cover. np_uint16 = np_type # pragma: no cover. np_uint32 = np_type # pragma: no cover. np_uint64 = np_type # pragma: no cover. np_intp = np_type # pragma: no cover. np_uintp = np_type # pragma: no cover. np_float32 = np_type # pragma: no cover. np_float64 = np_type # pragma: no cover. np_double = np_type # pragma: no cover. np_floating = np_type # pragma: no cover. np_complex64 = np_type # pragma: no cover. np_complex128 = np_type # pragma: no cover. np_cdouble = np_type # pragma: no cover. np_complexfloating = np_type # pragma: no cover. else: np_array_factory = np.array np_ndarray = np.ndarray np_bool_ = np.bool_ np_int8 = np.int8 np_int16 = np.int16 np_int32 = np.int32 np_int64 = np.int64 np_uint8 = np.uint8 np_uint16 = np.uint16 np_uint32 = np.uint32 np_uint64 = np.uint64 np_intp = np.intp np_uintp = np.uintp np_float32 = np.float32 np_float64 = np.float64 np_double = np.double # np.float_ is an alias for np.double and is being removed by NumPy 2.0 np_floating = np.floating np_complex64 = np.complex64 np_complex128 = np.complex128 np_cdouble = np.cdouble # np.complex_ is an alias for np.cdouble and is being removed by NumPy 2.0 np_complexfloating = np.complexfloating numpy_numbers = ( np_int8, np_int16, np_int32, np_int64, np_uint8, np_uint16, np_uint32, np_uint64, np_intp, np_uintp, np_float32, np_float64, np_double, np_floating, np_complex64, np_complex128, np_cdouble,) numpy_complex_numbers = ( np_complexfloating, np_complex64, np_complex128, np_cdouble, ) numpy_dtypes = set(numpy_numbers) numpy_dtypes.add(np_bool_) # type: ignore numpy_dtype_str_to_type = { item.__name__: item for item in numpy_dtypes } try: from pydantic.main import BaseModel as PydanticBaseModel # type: ignore except ImportError: PydanticBaseModel = pydantic_base_model_type logger = logging.getLogger(__name__) py_major_version = sys.version_info.major py_minor_version = sys.version_info.minor py_current_version = Decimal("{}.{}".format(py_major_version, py_minor_version)) py2 = py_major_version == 2 py3 = py_major_version == 3 py4 = py_major_version == 4 NUMERICS = frozenset(string.digits) class EnumBase(str, enum.Enum): def __repr__(self): """ We need to add a single quotes so we can easily copy the value when we do ipdb. """ return f"'{self.name}'" def __str__(self): return self.name def _int_or_zero(value): """ Tries to extract some number from a string. 12c becomes 12 """ try: return int(value) except Exception: result = [] for char in value: if char in NUMERICS: result.append(char) if result: return int(''.join(result)) return 0 def get_semvar_as_integer(version): """ Converts: '1.23.5' to 1023005 """ version = version.split('.') if len(version) > 3: version = version[:3] elif len(version) < 3: version.extend(['0'] * (3 - len(version))) return sum([10**(i * 3) * _int_or_zero(v) for i, v in enumerate(reversed(version))]) # we used to use OrderedDictPlus when dictionaries in Python were not ordered. dict_ = dict if py4: logger.warning('Python 4 is not supported yet. Switching logic to Python 3.') # pragma: no cover py3 = True # pragma: no cover if py2: # pragma: no cover sys.exit('Python 2 is not supported anymore. The last version of DeepDiff that supported Py2 was 3.3.0') pypy3 = py3 and hasattr(sys, "pypy_translation_info") if np and get_semvar_as_integer(np.__version__) < 1019000: sys.exit('The minimum required Numpy version is 1.19.0. Please upgrade your Numpy package.') strings = (str, bytes) # which are both basestring unicode_type = str bytes_type = bytes only_complex_number = (complex,) + numpy_complex_numbers only_numbers = (int, float, complex, Decimal) + numpy_numbers datetimes = (datetime.datetime, datetime.date, datetime.timedelta, datetime.time) ipranges = (ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network) uuids = (uuid.UUID, ) times = (datetime.datetime, datetime.time) numbers: Tuple = only_numbers + datetimes booleans = (bool, np_bool_) basic_types = strings + numbers + uuids + booleans + (type(None), ) class IndexedHash(NamedTuple): indexes: List item: Any current_dir = os.path.dirname(os.path.abspath(__file__)) ID_PREFIX = '!>*id' KEY_TO_VAL_STR = "{}:{}" TREE_VIEW = 'tree' TEXT_VIEW = 'text' DELTA_VIEW = '_delta' ENUM_INCLUDE_KEYS = ['__objclass__', 'name', 'value'] def short_repr(item, max_length=15): """Short representation of item if it is too long""" item = repr(item) if len(item) > max_length: item = '{}...{}'.format(item[:max_length - 3], item[-1]) return item class ListItemRemovedOrAdded: # pragma: no cover """Class of conditions to be checked""" pass class OtherTypes: def __repr__(self): return "Error: {}".format(self.__class__.__name__) # pragma: no cover __str__ = __repr__ class Skipped(OtherTypes): pass class Unprocessed(OtherTypes): pass class NotHashed(OtherTypes): pass class NotPresent: # pragma: no cover """ In a change tree, this indicated that a previously existing object has been removed -- or will only be added in the future. We previously used None for this but this caused problem when users actually added and removed None. Srsly guys? :D """ def __repr__(self): return 'not present' # pragma: no cover __str__ = __repr__ class CannotCompare(Exception): """ Exception when two items cannot be compared in the compare function. """ pass unprocessed = Unprocessed() skipped = Skipped() not_hashed = NotHashed() notpresent = NotPresent() # Disabling remapping from old to new keys since the mapping is deprecated. RemapDict = dict_ # class RemapDict(dict_): # """ # DISABLED # Remap Dictionary. # For keys that have a new, longer name, remap the old key to the new key. # Other keys that don't have a new name are handled as before. # """ # def __getitem__(self, old_key): # new_key = EXPANDED_KEY_MAP.get(old_key, old_key) # if new_key != old_key: # logger.warning( # "DeepDiff Deprecation: %s is renamed to %s. Please start using " # "the new unified naming convention.", old_key, new_key) # if new_key in self: # return self.get(new_key) # else: # pragma: no cover # raise KeyError(new_key) class indexed_set(set): """ A set class that lets you get an item by index >>> a = indexed_set() >>> a.add(10) >>> a.add(20) >>> a[0] 10 """ def add_to_frozen_set(parents_ids, item_id): return parents_ids | {item_id} def convert_item_or_items_into_set_else_none(items): if items: if isinstance(items, strings): items = {items} else: items = set(items) else: items = None return items def add_root_to_paths(paths): """ Sometimes the users want to just pass [key] instead of root[key] for example. Here we automatically add all sorts of variations that might match the path they were supposed to pass. """ if paths is None: return result = SetOrdered() for path in paths: if path.startswith('root'): result.add(path) else: if path.isdigit(): result.add(f"root['{path}']") result.add(f"root[{path}]") elif path[0].isdigit(): result.add(f"root['{path}']") else: result.add(f"root.{path}") result.add(f"root['{path}']") return result RE_COMPILED_TYPE = type(re.compile('')) def convert_item_or_items_into_compiled_regexes_else_none(items): if items: if isinstance(items, (strings, RE_COMPILED_TYPE)): items = [items] items = [i if isinstance(i, RE_COMPILED_TYPE) else re.compile(i) for i in items] else: items = None return items def get_id(obj): """ Adding some characters to id so they are not just integers to reduce the risk of collision. """ return "{}{}".format(ID_PREFIX, id(obj)) def get_type(obj): """ Get the type of object or if it is a class, return the class itself. """ if isinstance(obj, np_ndarray): return obj.dtype.type # type: ignore return obj if type(obj) is type else type(obj) def numpy_dtype_string_to_type(dtype_str): return numpy_dtype_str_to_type[dtype_str] def type_in_type_group(item, type_group): return get_type(item) in type_group def type_is_subclass_of_type_group(item, type_group): return isinstance(item, type_group) \ or (isinstance(item, type) and issubclass(item, type_group)) \ or type_in_type_group(item, type_group) def get_doc(doc_filename): try: with open(os.path.join(current_dir, '../docs/', doc_filename), 'r') as doc_file: doc = doc_file.read() except Exception: # pragma: no cover doc = 'Failed to load the docstrings. Please visit: https://zepworks.com/deepdiff/current/' # pragma: no cover return doc number_formatting = { "f": r'{:.%sf}', "e": r'{:.%se}', } def number_to_string(number, significant_digits, number_format_notation="f"): """ Convert numbers to string considering significant digits. """ try: using = number_formatting[number_format_notation] except KeyError: raise ValueError("number_format_notation got invalid value of {}. The valid values are 'f' and 'e'".format(number_format_notation)) from None if not isinstance(number, numbers): # type: ignore return number elif isinstance(number, Decimal): with localcontext() as ctx: # Precision = number of integer digits + significant_digits # Using number//1 to get the integer part of the number ctx.prec = len(str(abs(number // 1))) + significant_digits try: number = number.quantize(Decimal('0.' + '0' * significant_digits)) except InvalidDecimalOperation: # Sometimes rounding up causes a higher precision to be needed for the quantize operation # For example '999.99999999' will become '1000.000000' after quantize ctx.prec += 1 number = number.quantize(Decimal('0.' + '0' * significant_digits)) elif isinstance(number, only_complex_number): # type: ignore # Case for complex numbers. number = number.__class__( "{real}+{imag}j".format( # type: ignore real=number_to_string( number=number.real, # type: ignore significant_digits=significant_digits, number_format_notation=number_format_notation ), imag=number_to_string( number=number.imag, # type: ignore significant_digits=significant_digits, number_format_notation=number_format_notation ) ) # type: ignore ) else: number = round(number=number, ndigits=significant_digits) # type: ignore if significant_digits == 0: number = int(number) if number == 0.0: # Special case for 0: "-0.xx" should compare equal to "0.xx" number = abs(number) # type: ignore # Cast number to string result = (using % significant_digits).format(number) # https://bugs.python.org/issue36622 if number_format_notation == 'e': # Removing leading 0 for exponential part. result = re.sub( pattern=r'(?<=e(\+|\-))0(?=\d)+', repl=r'', string=result ) return result class DeepDiffDeprecationWarning(DeprecationWarning): """ Use this warning instead of DeprecationWarning """ pass def cartesian_product(a, b): """ Get the Cartesian product of two iterables **parameters** a: list of lists b: iterable to do the Cartesian product """ for i in a: for j in b: yield i + (j,) def cartesian_product_of_shape(dimentions, result=None): """ Cartesian product of a dimentions iterable. This is mainly used to traverse Numpy ndarrays. Each array has dimentions that are defines in ndarray.shape """ if result is None: result = ((),) # a tuple with an empty tuple for dimension in dimentions: result = cartesian_product(result, range(dimension)) return result def get_numpy_ndarray_rows(obj, shape=None): """ Convert a multi dimensional numpy array to list of rows """ if shape is None: shape = obj.shape dimentions = shape[:-1] for path_tuple in cartesian_product_of_shape(dimentions): result = obj for index in path_tuple: result = result[index] yield path_tuple, result class _NotFound: def __eq__(self, other): return False __req__ = __eq__ def __repr__(self): return 'not found' __str__ = __repr__ not_found = _NotFound() warnings.simplefilter('once', DeepDiffDeprecationWarning) class RepeatedTimer: """ Threaded Repeated Timer by MestreLion https://stackoverflow.com/a/38317060/1497443 """ def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.start_time = time.time() self.kwargs = kwargs self.is_running = False self.start() def _get_duration_sec(self): return int(time.time() - self.start_time) def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): self.kwargs.update(duration=self._get_duration_sec()) if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): duration = self._get_duration_sec() if self._timer is not None: self._timer.cancel() self.is_running = False return duration def _eval_decimal(params): return Decimal(params) def _eval_datetime(params): params = f'({params})' params = literal_eval(params) return datetime.datetime(*params) def _eval_date(params): params = f'({params})' params = literal_eval(params) return datetime.date(*params) LITERAL_EVAL_PRE_PROCESS = [ ('Decimal(', ')', _eval_decimal), ('datetime.datetime(', ')', _eval_datetime), ('datetime.date(', ')', _eval_date), ] def literal_eval_extended(item): """ An extended version of literal_eval """ try: return literal_eval(item) except (SyntaxError, ValueError): for begin, end, func in LITERAL_EVAL_PRE_PROCESS: if item.startswith(begin) and item.endswith(end): # Extracting and removing extra quotes so for example "Decimal('10.1')" becomes "'10.1'" and then '10.1' params = item[len(begin): -len(end)].strip('\'\"') return func(params) raise def time_to_seconds(t:datetime.time) -> int: return (t.hour * 60 + t.minute) * 60 + t.second def datetime_normalize( truncate_datetime:Union[str, None], obj:Union[datetime.datetime, datetime.time], default_timezone: Union[ datetime.timezone, "BaseTzInfo" ] = datetime.timezone.utc, ) -> Any: if truncate_datetime: if truncate_datetime == 'second': obj = obj.replace(microsecond=0) elif truncate_datetime == 'minute': obj = obj.replace(second=0, microsecond=0) elif truncate_datetime == 'hour': obj = obj.replace(minute=0, second=0, microsecond=0) elif truncate_datetime == 'day': obj = obj.replace(hour=0, minute=0, second=0, microsecond=0) if isinstance(obj, datetime.datetime): if has_timezone(obj): obj = obj.astimezone(default_timezone) else: obj = obj.replace(tzinfo=default_timezone) elif isinstance(obj, datetime.time): return time_to_seconds(obj) return obj def has_timezone(dt): """ Function to check if a datetime object has a timezone Checking dt.tzinfo.utcoffset(dt) ensures that the datetime object is truly timezone-aware because some datetime objects may have a tzinfo attribute that is not None but still doesn't provide a valid offset. Certain tzinfo objects, such as pytz.timezone(None), can exist but do not provide meaningful UTC offset information. If tzinfo is present but calling .utcoffset(dt) returns None, the datetime is not truly timezone-aware. """ return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None def get_truncate_datetime(truncate_datetime) -> Union[str, None]: """ Validates truncate_datetime value """ if truncate_datetime not in {None, 'second', 'minute', 'hour', 'day'}: raise ValueError("truncate_datetime must be second, minute, hour or day") return truncate_datetime def cartesian_product_numpy(*arrays): """ Cartesian product of Numpy arrays by Paul Panzer https://stackoverflow.com/a/49445693/1497443 """ la = len(arrays) dtype = np.result_type(*arrays) # type: ignore arr = np.empty((la, *map(len, arrays)), dtype=dtype) # type: ignore idx = slice(None), *repeat(None, la) for i, a in enumerate(arrays): arr[i, ...] = a[idx[:la - i]] return arr.reshape(la, -1).T def diff_numpy_array(A, B): """ Numpy Array A - B return items in A that are not in B By Divakar https://stackoverflow.com/a/52417967/1497443 """ return A[~np.isin(A, B)] # type: ignore PYTHON_TYPE_TO_NUMPY_TYPE = { int: np_int64, float: np_float64, Decimal: np_float64 } def get_homogeneous_numpy_compatible_type_of_seq(seq): """ Return with the numpy dtype if the array can be converted to a non-object numpy array. Originally written by mgilson https://stackoverflow.com/a/13252348/1497443 This is the modified version. """ iseq = iter(seq) first_type = type(next(iseq)) if first_type in {int, float, Decimal}: type_ = first_type if all((type(x) is first_type) for x in iseq) else False return PYTHON_TYPE_TO_NUMPY_TYPE.get(type_, False) else: return False def detailed__dict__(obj, ignore_private_variables=True, ignore_keys=frozenset(), include_keys=None): """ Get the detailed dictionary of an object. This is used so we retrieve object properties too. """ if include_keys: result = {} for key in include_keys: try: value = getattr(obj, key) except Exception: pass else: if not callable(value) or key == '__objclass__': # We don't want to compare functions, however for backward compatibility, __objclass__ needs to be reported. result[key] = value else: result = obj.__dict__.copy() # A shallow copy private_var_prefix = f"_{obj.__class__.__name__}__" # The semi private variables in Python get this prefix for key in ignore_keys: if key in result or ( ignore_private_variables and key.startswith('__') and not key.startswith(private_var_prefix) ): del result[key] for key in dir(obj): if key not in result and key not in ignore_keys and ( not ignore_private_variables or ( ignore_private_variables and not key.startswith('__') and not key.startswith(private_var_prefix) ) ): value = getattr(obj, key) if not callable(value): result[key] = value return result def named_tuple_repr(self): fields = [] for field, value in self._asdict().items(): # Only include fields that do not have their default value if field in self._field_defaults: if value != self._field_defaults[field]: fields.append(f"{field}={value!r}") else: fields.append(f"{field}={value!r}") return f"{self.__class__.__name__}({', '.join(fields)})" class OpcodeTag(EnumBase): insert = 'insert' delete = 'delete' equal = 'equal' replace = 'replace' # type: ignore # swapped = 'swapped' # in the future we should support reporting of items swapped with each other class Opcode(NamedTuple): tag: str t1_from_index: int t1_to_index: int t2_from_index: int t2_to_index: int old_values: Optional[List[Any]] = None new_values: Optional[List[Any]] = None __repr__ = __str__ = named_tuple_repr class FlatDataAction(EnumBase): values_changed = 'values_changed' type_changes = 'type_changes' set_item_added = 'set_item_added' set_item_removed = 'set_item_removed' dictionary_item_added = 'dictionary_item_added' dictionary_item_removed = 'dictionary_item_removed' iterable_item_added = 'iterable_item_added' iterable_item_removed = 'iterable_item_removed' iterable_item_moved = 'iterable_item_moved' iterable_items_inserted = 'iterable_items_inserted' # opcode iterable_items_deleted = 'iterable_items_deleted' # opcode iterable_items_replaced = 'iterable_items_replaced' # opcode iterable_items_equal = 'iterable_items_equal' # opcode attribute_removed = 'attribute_removed' attribute_added = 'attribute_added' unordered_iterable_item_added = 'unordered_iterable_item_added' unordered_iterable_item_removed = 'unordered_iterable_item_removed' initiated = "initiated" OPCODE_TAG_TO_FLAT_DATA_ACTION = { OpcodeTag.insert: FlatDataAction.iterable_items_inserted, OpcodeTag.delete: FlatDataAction.iterable_items_deleted, OpcodeTag.replace: FlatDataAction.iterable_items_replaced, OpcodeTag.equal: FlatDataAction.iterable_items_equal, } FLAT_DATA_ACTION_TO_OPCODE_TAG = {v: i for i, v in OPCODE_TAG_TO_FLAT_DATA_ACTION.items()} UnkownValueCode: str = 'unknown___' class FlatDeltaRow(NamedTuple): path: List action: FlatDataAction value: Optional[Any] = UnkownValueCode old_value: Optional[Any] = UnkownValueCode type: Optional[Any] = UnkownValueCode old_type: Optional[Any] = UnkownValueCode new_path: Optional[List] = None t1_from_index: Optional[int] = None t1_to_index: Optional[int] = None t2_from_index: Optional[int] = None t2_to_index: Optional[int] = None __repr__ = __str__ = named_tuple_repr JSON = Union[Dict[str, str], List[str], List[int], Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] class SummaryNodeType(EnumBase): dict = 'dict' list = 'list' leaf = 'leaf'