import sys
import re
import os
import datetime
import uuid
import logging
import warnings
import string
import time
import enum
import ipaddress
from typing import NamedTuple, Any, List, Optional, Dict, Union, TYPE_CHECKING, Tuple
from ast import literal_eval
from decimal import Decimal, localcontext, InvalidOperation as InvalidDecimalOperation
from itertools import repeat
from orderly_set import StableSetEq as SetOrderedBase # median: 1.0867 s for cache test, 5.63s for all tests
from threading import Timer
if TYPE_CHECKING:
from pytz.tzinfo import BaseTzInfo
class np_type:
pass
class pydantic_base_model_type:
pass
class SetOrdered(SetOrderedBase):
def __repr__(self):
return str(list(self))
try:
import numpy as np
except ImportError: # pragma: no cover. The case without Numpy is tested locally only.
np = None # pragma: no cover.
np_array_factory = 'numpy not available' # pragma: no cover.
np_ndarray = np_type # pragma: no cover.
np_bool_ = np_type # pragma: no cover.
np_int8 = np_type # pragma: no cover.
np_int16 = np_type # pragma: no cover.
np_int32 = np_type # pragma: no cover.
np_int64 = np_type # pragma: no cover.
np_uint8 = np_type # pragma: no cover.
np_uint16 = np_type # pragma: no cover.
np_uint32 = np_type # pragma: no cover.
np_uint64 = np_type # pragma: no cover.
np_intp = np_type # pragma: no cover.
np_uintp = np_type # pragma: no cover.
np_float32 = np_type # pragma: no cover.
np_float64 = np_type # pragma: no cover.
np_double = np_type # pragma: no cover.
np_floating = np_type # pragma: no cover.
np_complex64 = np_type # pragma: no cover.
np_complex128 = np_type # pragma: no cover.
np_cdouble = np_type # pragma: no cover.
np_complexfloating = np_type # pragma: no cover.
else:
np_array_factory = np.array
np_ndarray = np.ndarray
np_bool_ = np.bool_
np_int8 = np.int8
np_int16 = np.int16
np_int32 = np.int32
np_int64 = np.int64
np_uint8 = np.uint8
np_uint16 = np.uint16
np_uint32 = np.uint32
np_uint64 = np.uint64
np_intp = np.intp
np_uintp = np.uintp
np_float32 = np.float32
np_float64 = np.float64
np_double = np.double # np.float_ is an alias for np.double and is being removed by NumPy 2.0
np_floating = np.floating
np_complex64 = np.complex64
np_complex128 = np.complex128
np_cdouble = np.cdouble # np.complex_ is an alias for np.cdouble and is being removed by NumPy 2.0
np_complexfloating = np.complexfloating
numpy_numbers = (
np_int8, np_int16, np_int32, np_int64, np_uint8,
np_uint16, np_uint32, np_uint64, np_intp, np_uintp,
np_float32, np_float64, np_double, np_floating, np_complex64,
np_complex128, np_cdouble,)
numpy_complex_numbers = (
np_complexfloating, np_complex64, np_complex128, np_cdouble,
)
numpy_dtypes = set(numpy_numbers)
numpy_dtypes.add(np_bool_) # type: ignore
numpy_dtype_str_to_type = {
item.__name__: item for item in numpy_dtypes
}
try:
from pydantic.main import BaseModel as PydanticBaseModel # type: ignore
except ImportError:
PydanticBaseModel = pydantic_base_model_type
logger = logging.getLogger(__name__)
py_major_version = sys.version_info.major
py_minor_version = sys.version_info.minor
py_current_version = Decimal("{}.{}".format(py_major_version, py_minor_version))
py2 = py_major_version == 2
py3 = py_major_version == 3
py4 = py_major_version == 4
NUMERICS = frozenset(string.digits)
class EnumBase(str, enum.Enum):
def __repr__(self):
"""
We need to add a single quotes so we can easily copy the value when we do ipdb.
"""
return f"'{self.name}'"
def __str__(self):
return self.name
def _int_or_zero(value):
"""
Tries to extract some number from a string.
12c becomes 12
"""
try:
return int(value)
except Exception:
result = []
for char in value:
if char in NUMERICS:
result.append(char)
if result:
return int(''.join(result))
return 0
def get_semvar_as_integer(version):
"""
Converts:
'1.23.5' to 1023005
"""
version = version.split('.')
if len(version) > 3:
version = version[:3]
elif len(version) < 3:
version.extend(['0'] * (3 - len(version)))
return sum([10**(i * 3) * _int_or_zero(v) for i, v in enumerate(reversed(version))])
# we used to use OrderedDictPlus when dictionaries in Python were not ordered.
dict_ = dict
if py4:
logger.warning('Python 4 is not supported yet. Switching logic to Python 3.') # pragma: no cover
py3 = True # pragma: no cover
if py2: # pragma: no cover
sys.exit('Python 2 is not supported anymore. The last version of DeepDiff that supported Py2 was 3.3.0')
pypy3 = py3 and hasattr(sys, "pypy_translation_info")
if np and get_semvar_as_integer(np.__version__) < 1019000:
sys.exit('The minimum required Numpy version is 1.19.0. Please upgrade your Numpy package.')
strings = (str, bytes) # which are both basestring
unicode_type = str
bytes_type = bytes
only_complex_number = (complex,) + numpy_complex_numbers
only_numbers = (int, float, complex, Decimal) + numpy_numbers
datetimes = (datetime.datetime, datetime.date, datetime.timedelta, datetime.time)
ipranges = (ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network)
uuids = (uuid.UUID, )
times = (datetime.datetime, datetime.time)
numbers: Tuple = only_numbers + datetimes
booleans = (bool, np_bool_)
basic_types = strings + numbers + uuids + booleans + (type(None), )
class IndexedHash(NamedTuple):
indexes: List
item: Any
current_dir = os.path.dirname(os.path.abspath(__file__))
ID_PREFIX = '!>*id'
KEY_TO_VAL_STR = "{}:{}"
TREE_VIEW = 'tree'
TEXT_VIEW = 'text'
DELTA_VIEW = '_delta'
ENUM_INCLUDE_KEYS = ['__objclass__', 'name', 'value']
def short_repr(item, max_length=15):
"""Short representation of item if it is too long"""
item = repr(item)
if len(item) > max_length:
item = '{}...{}'.format(item[:max_length - 3], item[-1])
return item
class ListItemRemovedOrAdded: # pragma: no cover
"""Class of conditions to be checked"""
pass
class OtherTypes:
def __repr__(self):
return "Error: {}".format(self.__class__.__name__) # pragma: no cover
__str__ = __repr__
class Skipped(OtherTypes):
pass
class Unprocessed(OtherTypes):
pass
class NotHashed(OtherTypes):
pass
class NotPresent: # pragma: no cover
"""
In a change tree, this indicated that a previously existing object has been removed -- or will only be added
in the future.
We previously used None for this but this caused problem when users actually added and removed None. Srsly guys? :D
"""
def __repr__(self):
return 'not present' # pragma: no cover
__str__ = __repr__
class CannotCompare(Exception):
"""
Exception when two items cannot be compared in the compare function.
"""
pass
unprocessed = Unprocessed()
skipped = Skipped()
not_hashed = NotHashed()
notpresent = NotPresent()
# Disabling remapping from old to new keys since the mapping is deprecated.
RemapDict = dict_
# class RemapDict(dict_):
# """
# DISABLED
# Remap Dictionary.
# For keys that have a new, longer name, remap the old key to the new key.
# Other keys that don't have a new name are handled as before.
# """
# def __getitem__(self, old_key):
# new_key = EXPANDED_KEY_MAP.get(old_key, old_key)
# if new_key != old_key:
# logger.warning(
# "DeepDiff Deprecation: %s is renamed to %s. Please start using "
# "the new unified naming convention.", old_key, new_key)
# if new_key in self:
# return self.get(new_key)
# else: # pragma: no cover
# raise KeyError(new_key)
class indexed_set(set):
"""
A set class that lets you get an item by index
>>> a = indexed_set()
>>> a.add(10)
>>> a.add(20)
>>> a[0]
10
"""
def add_to_frozen_set(parents_ids, item_id):
return parents_ids | {item_id}
def convert_item_or_items_into_set_else_none(items):
if items:
if isinstance(items, strings):
items = {items}
else:
items = set(items)
else:
items = None
return items
def add_root_to_paths(paths):
"""
Sometimes the users want to just pass
[key] instead of root[key] for example.
Here we automatically add all sorts of variations that might match
the path they were supposed to pass.
"""
if paths is None:
return
result = SetOrdered()
for path in paths:
if path.startswith('root'):
result.add(path)
else:
if path.isdigit():
result.add(f"root['{path}']")
result.add(f"root[{path}]")
elif path[0].isdigit():
result.add(f"root['{path}']")
else:
result.add(f"root.{path}")
result.add(f"root['{path}']")
return result
RE_COMPILED_TYPE = type(re.compile(''))
def convert_item_or_items_into_compiled_regexes_else_none(items):
if items:
if isinstance(items, (strings, RE_COMPILED_TYPE)):
items = [items]
items = [i if isinstance(i, RE_COMPILED_TYPE) else re.compile(i) for i in items]
else:
items = None
return items
def get_id(obj):
"""
Adding some characters to id so they are not just integers to reduce the risk of collision.
"""
return "{}{}".format(ID_PREFIX, id(obj))
def get_type(obj):
"""
Get the type of object or if it is a class, return the class itself.
"""
if isinstance(obj, np_ndarray):
return obj.dtype.type # type: ignore
return obj if type(obj) is type else type(obj)
def numpy_dtype_string_to_type(dtype_str):
return numpy_dtype_str_to_type[dtype_str]
def type_in_type_group(item, type_group):
return get_type(item) in type_group
def type_is_subclass_of_type_group(item, type_group):
return isinstance(item, type_group) \
or (isinstance(item, type) and issubclass(item, type_group)) \
or type_in_type_group(item, type_group)
def get_doc(doc_filename):
try:
with open(os.path.join(current_dir, '../docs/', doc_filename), 'r') as doc_file:
doc = doc_file.read()
except Exception: # pragma: no cover
doc = 'Failed to load the docstrings. Please visit: https://zepworks.com/deepdiff/current/' # pragma: no cover
return doc
number_formatting = {
"f": r'{:.%sf}',
"e": r'{:.%se}',
}
def number_to_string(number, significant_digits, number_format_notation="f"):
"""
Convert numbers to string considering significant digits.
"""
try:
using = number_formatting[number_format_notation]
except KeyError:
raise ValueError("number_format_notation got invalid value of {}. The valid values are 'f' and 'e'".format(number_format_notation)) from None
if not isinstance(number, numbers): # type: ignore
return number
elif isinstance(number, Decimal):
with localcontext() as ctx:
# Precision = number of integer digits + significant_digits
# Using number//1 to get the integer part of the number
ctx.prec = len(str(abs(number // 1))) + significant_digits
try:
number = number.quantize(Decimal('0.' + '0' * significant_digits))
except InvalidDecimalOperation:
# Sometimes rounding up causes a higher precision to be needed for the quantize operation
# For example '999.99999999' will become '1000.000000' after quantize
ctx.prec += 1
number = number.quantize(Decimal('0.' + '0' * significant_digits))
elif isinstance(number, only_complex_number): # type: ignore
# Case for complex numbers.
number = number.__class__(
"{real}+{imag}j".format( # type: ignore
real=number_to_string(
number=number.real, # type: ignore
significant_digits=significant_digits,
number_format_notation=number_format_notation
),
imag=number_to_string(
number=number.imag, # type: ignore
significant_digits=significant_digits,
number_format_notation=number_format_notation
)
) # type: ignore
)
else:
number = round(number=number, ndigits=significant_digits) # type: ignore
if significant_digits == 0:
number = int(number)
if number == 0.0:
# Special case for 0: "-0.xx" should compare equal to "0.xx"
number = abs(number) # type: ignore
# Cast number to string
result = (using % significant_digits).format(number)
# https://bugs.python.org/issue36622
if number_format_notation == 'e':
# Removing leading 0 for exponential part.
result = re.sub(
pattern=r'(?<=e(\+|\-))0(?=\d)+',
repl=r'',
string=result
)
return result
class DeepDiffDeprecationWarning(DeprecationWarning):
"""
Use this warning instead of DeprecationWarning
"""
pass
def cartesian_product(a, b):
"""
Get the Cartesian product of two iterables
**parameters**
a: list of lists
b: iterable to do the Cartesian product
"""
for i in a:
for j in b:
yield i + (j,)
def cartesian_product_of_shape(dimentions, result=None):
"""
Cartesian product of a dimentions iterable.
This is mainly used to traverse Numpy ndarrays.
Each array has dimentions that are defines in ndarray.shape
"""
if result is None:
result = ((),) # a tuple with an empty tuple
for dimension in dimentions:
result = cartesian_product(result, range(dimension))
return result
def get_numpy_ndarray_rows(obj, shape=None):
"""
Convert a multi dimensional numpy array to list of rows
"""
if shape is None:
shape = obj.shape
dimentions = shape[:-1]
for path_tuple in cartesian_product_of_shape(dimentions):
result = obj
for index in path_tuple:
result = result[index]
yield path_tuple, result
class _NotFound:
def __eq__(self, other):
return False
__req__ = __eq__
def __repr__(self):
return 'not found'
__str__ = __repr__
not_found = _NotFound()
warnings.simplefilter('once', DeepDiffDeprecationWarning)
class RepeatedTimer:
"""
Threaded Repeated Timer by MestreLion
https://stackoverflow.com/a/38317060/1497443
"""
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.start_time = time.time()
self.kwargs = kwargs
self.is_running = False
self.start()
def _get_duration_sec(self):
return int(time.time() - self.start_time)
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
self.kwargs.update(duration=self._get_duration_sec())
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
duration = self._get_duration_sec()
if self._timer is not None:
self._timer.cancel()
self.is_running = False
return duration
def _eval_decimal(params):
return Decimal(params)
def _eval_datetime(params):
params = f'({params})'
params = literal_eval(params)
return datetime.datetime(*params)
def _eval_date(params):
params = f'({params})'
params = literal_eval(params)
return datetime.date(*params)
LITERAL_EVAL_PRE_PROCESS = [
('Decimal(', ')', _eval_decimal),
('datetime.datetime(', ')', _eval_datetime),
('datetime.date(', ')', _eval_date),
]
def literal_eval_extended(item):
"""
An extended version of literal_eval
"""
try:
return literal_eval(item)
except (SyntaxError, ValueError):
for begin, end, func in LITERAL_EVAL_PRE_PROCESS:
if item.startswith(begin) and item.endswith(end):
# Extracting and removing extra quotes so for example "Decimal('10.1')" becomes "'10.1'" and then '10.1'
params = item[len(begin): -len(end)].strip('\'\"')
return func(params)
raise
def time_to_seconds(t:datetime.time) -> int:
return (t.hour * 60 + t.minute) * 60 + t.second
def datetime_normalize(
truncate_datetime:Union[str, None],
obj:Union[datetime.datetime, datetime.time],
default_timezone: Union[
datetime.timezone, "BaseTzInfo"
] = datetime.timezone.utc,
) -> Any:
if truncate_datetime:
if truncate_datetime == 'second':
obj = obj.replace(microsecond=0)
elif truncate_datetime == 'minute':
obj = obj.replace(second=0, microsecond=0)
elif truncate_datetime == 'hour':
obj = obj.replace(minute=0, second=0, microsecond=0)
elif truncate_datetime == 'day':
obj = obj.replace(hour=0, minute=0, second=0, microsecond=0)
if isinstance(obj, datetime.datetime):
if has_timezone(obj):
obj = obj.astimezone(default_timezone)
else:
obj = obj.replace(tzinfo=default_timezone)
elif isinstance(obj, datetime.time):
return time_to_seconds(obj)
return obj
def has_timezone(dt):
"""
Function to check if a datetime object has a timezone
Checking dt.tzinfo.utcoffset(dt) ensures that the datetime object is truly timezone-aware
because some datetime objects may have a tzinfo attribute that is not None but still
doesn't provide a valid offset.
Certain tzinfo objects, such as pytz.timezone(None), can exist but do not provide meaningful UTC offset information.
If tzinfo is present but calling .utcoffset(dt) returns None, the datetime is not truly timezone-aware.
"""
return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
def get_truncate_datetime(truncate_datetime) -> Union[str, None]:
"""
Validates truncate_datetime value
"""
if truncate_datetime not in {None, 'second', 'minute', 'hour', 'day'}:
raise ValueError("truncate_datetime must be second, minute, hour or day")
return truncate_datetime
def cartesian_product_numpy(*arrays):
"""
Cartesian product of Numpy arrays by Paul Panzer
https://stackoverflow.com/a/49445693/1497443
"""
la = len(arrays)
dtype = np.result_type(*arrays) # type: ignore
arr = np.empty((la, *map(len, arrays)), dtype=dtype) # type: ignore
idx = slice(None), *repeat(None, la)
for i, a in enumerate(arrays):
arr[i, ...] = a[idx[:la - i]]
return arr.reshape(la, -1).T
def diff_numpy_array(A, B):
"""
Numpy Array A - B
return items in A that are not in B
By Divakar
https://stackoverflow.com/a/52417967/1497443
"""
return A[~np.isin(A, B)] # type: ignore
PYTHON_TYPE_TO_NUMPY_TYPE = {
int: np_int64,
float: np_float64,
Decimal: np_float64
}
def get_homogeneous_numpy_compatible_type_of_seq(seq):
"""
Return with the numpy dtype if the array can be converted to a non-object numpy array.
Originally written by mgilson https://stackoverflow.com/a/13252348/1497443
This is the modified version.
"""
iseq = iter(seq)
first_type = type(next(iseq))
if first_type in {int, float, Decimal}:
type_ = first_type if all((type(x) is first_type) for x in iseq) else False
return PYTHON_TYPE_TO_NUMPY_TYPE.get(type_, False)
else:
return False
def detailed__dict__(obj, ignore_private_variables=True, ignore_keys=frozenset(), include_keys=None):
"""
Get the detailed dictionary of an object.
This is used so we retrieve object properties too.
"""
if include_keys:
result = {}
for key in include_keys:
try:
value = getattr(obj, key)
except Exception:
pass
else:
if not callable(value) or key == '__objclass__': # We don't want to compare functions, however for backward compatibility, __objclass__ needs to be reported.
result[key] = value
else:
result = obj.__dict__.copy() # A shallow copy
private_var_prefix = f"_{obj.__class__.__name__}__" # The semi private variables in Python get this prefix
for key in ignore_keys:
if key in result or (
ignore_private_variables and key.startswith('__') and not key.startswith(private_var_prefix)
):
del result[key]
for key in dir(obj):
if key not in result and key not in ignore_keys and (
not ignore_private_variables or (
ignore_private_variables and not key.startswith('__') and not key.startswith(private_var_prefix)
)
):
value = getattr(obj, key)
if not callable(value):
result[key] = value
return result
def named_tuple_repr(self):
fields = []
for field, value in self._asdict().items():
# Only include fields that do not have their default value
if field in self._field_defaults:
if value != self._field_defaults[field]:
fields.append(f"{field}={value!r}")
else:
fields.append(f"{field}={value!r}")
return f"{self.__class__.__name__}({', '.join(fields)})"
class OpcodeTag(EnumBase):
insert = 'insert'
delete = 'delete'
equal = 'equal'
replace = 'replace' # type: ignore
# swapped = 'swapped' # in the future we should support reporting of items swapped with each other
class Opcode(NamedTuple):
tag: str
t1_from_index: int
t1_to_index: int
t2_from_index: int
t2_to_index: int
old_values: Optional[List[Any]] = None
new_values: Optional[List[Any]] = None
__repr__ = __str__ = named_tuple_repr
class FlatDataAction(EnumBase):
values_changed = 'values_changed'
type_changes = 'type_changes'
set_item_added = 'set_item_added'
set_item_removed = 'set_item_removed'
dictionary_item_added = 'dictionary_item_added'
dictionary_item_removed = 'dictionary_item_removed'
iterable_item_added = 'iterable_item_added'
iterable_item_removed = 'iterable_item_removed'
iterable_item_moved = 'iterable_item_moved'
iterable_items_inserted = 'iterable_items_inserted' # opcode
iterable_items_deleted = 'iterable_items_deleted' # opcode
iterable_items_replaced = 'iterable_items_replaced' # opcode
iterable_items_equal = 'iterable_items_equal' # opcode
attribute_removed = 'attribute_removed'
attribute_added = 'attribute_added'
unordered_iterable_item_added = 'unordered_iterable_item_added'
unordered_iterable_item_removed = 'unordered_iterable_item_removed'
initiated = "initiated"
OPCODE_TAG_TO_FLAT_DATA_ACTION = {
OpcodeTag.insert: FlatDataAction.iterable_items_inserted,
OpcodeTag.delete: FlatDataAction.iterable_items_deleted,
OpcodeTag.replace: FlatDataAction.iterable_items_replaced,
OpcodeTag.equal: FlatDataAction.iterable_items_equal,
}
FLAT_DATA_ACTION_TO_OPCODE_TAG = {v: i for i, v in OPCODE_TAG_TO_FLAT_DATA_ACTION.items()}
UnkownValueCode: str = 'unknown___'
class FlatDeltaRow(NamedTuple):
path: List
action: FlatDataAction
value: Optional[Any] = UnkownValueCode
old_value: Optional[Any] = UnkownValueCode
type: Optional[Any] = UnkownValueCode
old_type: Optional[Any] = UnkownValueCode
new_path: Optional[List] = None
t1_from_index: Optional[int] = None
t1_to_index: Optional[int] = None
t2_from_index: Optional[int] = None
t2_to_index: Optional[int] = None
__repr__ = __str__ = named_tuple_repr
JSON = Union[Dict[str, str], List[str], List[int], Dict[str, "JSON"], List["JSON"], str, int, float, bool, None]
class SummaryNodeType(EnumBase):
dict = 'dict'
list = 'list'
leaf = 'leaf'