#!/usr/bin/env python import logging import datetime from typing import Union, Optional, Any, List, TYPE_CHECKING from collections.abc import Iterable, MutableMapping from collections import defaultdict from hashlib import sha1, sha256 from pathlib import Path from enum import Enum from deepdiff.helper import (strings, numbers, times, unprocessed, not_hashed, add_to_frozen_set, convert_item_or_items_into_set_else_none, get_doc, ipranges, convert_item_or_items_into_compiled_regexes_else_none, get_id, type_is_subclass_of_type_group, type_in_type_group, number_to_string, datetime_normalize, KEY_TO_VAL_STR, get_truncate_datetime, dict_, add_root_to_paths, PydanticBaseModel) from deepdiff.base import Base if TYPE_CHECKING: from pytz.tzinfo import BaseTzInfo try: import pandas except ImportError: pandas = False try: import polars except ImportError: polars = False try: import numpy as np booleanTypes = (bool, np.bool_) except ImportError: booleanTypes = bool logger = logging.getLogger(__name__) UNPROCESSED_KEY = object() EMPTY_FROZENSET = frozenset() INDEX_VS_ATTRIBUTE = ('[%s]', '.%s') HASH_LOOKUP_ERR_MSG = '{} is not one of the hashed items.' def sha256hex(obj): """Use Sha256 as a cryptographic hash.""" if isinstance(obj, str): obj = obj.encode('utf-8') return sha256(obj).hexdigest() def sha1hex(obj): """Use Sha1 as a cryptographic hash.""" if isinstance(obj, str): obj = obj.encode('utf-8') return sha1(obj).hexdigest() default_hasher = sha256hex def combine_hashes_lists(items, prefix): """ Combines lists of hashes into one hash This can be optimized in future. It needs to work with both murmur3 hashes (int) and sha256 (str) Although murmur3 is not used anymore. """ if isinstance(prefix, bytes): prefix = prefix.decode('utf-8') hashes_bytes = b'' for item in items: # In order to make sure the order of hashes in each item does not affect the hash # we resort them. hashes_bytes += (''.join(map(str, sorted(item))) + '--').encode('utf-8') return prefix + str(default_hasher(hashes_bytes)) class BoolObj(Enum): TRUE = 1 FALSE = 0 def prepare_string_for_hashing( obj, ignore_string_type_changes=False, ignore_string_case=False, encodings=None, ignore_encoding_errors=False, ): """ Clean type conversions """ original_type = obj.__class__.__name__ # https://docs.python.org/3/library/codecs.html#codecs.decode errors_mode = 'ignore' if ignore_encoding_errors else 'strict' if isinstance(obj, bytes): err = None encodings = ['utf-8'] if encodings is None else encodings encoded = False for encoding in encodings: try: obj = obj.decode(encoding, errors=errors_mode) encoded = True break except UnicodeDecodeError as er: err = er if not encoded and err is not None: obj_decoded = obj.decode('utf-8', errors='ignore') # type: ignore start = max(err.start - 20, 0) start_prefix = '' if start > 0: start_prefix = '...' end = err.end + 20 end_suffix = '...' if end >= len(obj): end = len(obj) end_suffix = '' raise UnicodeDecodeError( err.encoding, err.object, err.start, err.end, f"{err.reason} in '{start_prefix}{obj_decoded[start:end]}{end_suffix}'. Please either pass ignore_encoding_errors=True or pass the encoding via encodings=['utf-8', '...']." ) from None if not ignore_string_type_changes: obj = KEY_TO_VAL_STR.format(original_type, obj) if ignore_string_case: obj = obj.lower() return obj doc = get_doc('deephash_doc.rst') class DeepHash(Base): __doc__ = doc def __init__(self, obj: Any, *, apply_hash=True, custom_operators: Optional[List[Any]] =None, default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc, encodings=None, exclude_obj_callback=None, exclude_paths=None, exclude_regex_paths=None, exclude_types=None, hasher=None, hashes=None, ignore_encoding_errors=False, ignore_iterable_order=True, ignore_numeric_type_changes=False, ignore_private_variables=True, ignore_repetition=True, ignore_string_case=False, ignore_string_type_changes=False, ignore_type_in_groups=None, ignore_type_subclasses=False, include_paths=None, number_format_notation="f", number_to_string_func=None, parent="root", significant_digits=None, truncate_datetime=None, use_enum_value=False, **kwargs): if kwargs: raise ValueError( ("The following parameter(s) are not valid: %s\n" "The valid parameters are obj, hashes, exclude_types, significant_digits, truncate_datetime," "exclude_paths, include_paths, exclude_regex_paths, hasher, ignore_repetition, " "number_format_notation, apply_hash, ignore_type_in_groups, ignore_string_type_changes, " "ignore_numeric_type_changes, ignore_type_subclasses, ignore_string_case " "number_to_string_func, ignore_private_variables, parent, use_enum_value, default_timezone " "encodings, ignore_encoding_errors") % ', '.join(kwargs.keys())) if isinstance(hashes, MutableMapping): self.hashes = hashes elif isinstance(hashes, DeepHash): self.hashes = hashes.hashes else: self.hashes = dict_() exclude_types = set() if exclude_types is None else set(exclude_types) self.exclude_types_tuple = tuple(exclude_types) # we need tuple for checking isinstance self.ignore_repetition = ignore_repetition self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths)) self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths)) self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths) self.hasher = default_hasher if hasher is None else hasher self.hashes[UNPROCESSED_KEY] = [] self.use_enum_value = use_enum_value self.default_timezone = default_timezone self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes) self.truncate_datetime = get_truncate_datetime(truncate_datetime) self.number_format_notation = number_format_notation self.ignore_type_in_groups = self.get_ignore_types_in_groups( ignore_type_in_groups=ignore_type_in_groups, ignore_string_type_changes=ignore_string_type_changes, ignore_numeric_type_changes=ignore_numeric_type_changes, ignore_type_subclasses=ignore_type_subclasses) self.ignore_string_type_changes = ignore_string_type_changes self.ignore_numeric_type_changes = ignore_numeric_type_changes self.ignore_string_case = ignore_string_case self.exclude_obj_callback = exclude_obj_callback # makes the hash return constant size result if true # the only time it should be set to False is when # testing the individual hash functions for different types of objects. self.apply_hash = apply_hash self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group # self.type_check_func = type_is_subclass_of_type_group if ignore_type_subclasses else type_in_type_group self.number_to_string = number_to_string_func or number_to_string self.ignore_private_variables = ignore_private_variables self.encodings = encodings self.ignore_encoding_errors = ignore_encoding_errors self.ignore_iterable_order = ignore_iterable_order self.custom_operators = custom_operators self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)})) if self.hashes[UNPROCESSED_KEY]: logger.warning("Can not hash the following items: {}.".format(self.hashes[UNPROCESSED_KEY])) else: del self.hashes[UNPROCESSED_KEY] sha256hex = sha256hex sha1hex = sha1hex def __getitem__(self, obj, extract_index=0): return self._getitem(self.hashes, obj, extract_index=extract_index, use_enum_value=self.use_enum_value) @staticmethod def _getitem(hashes, obj, extract_index=0, use_enum_value=False): """ extract_index is zero for hash and 1 for count and None to get them both. To keep it backward compatible, we only get the hash by default so it is set to zero by default. """ key = obj if obj is True: key = BoolObj.TRUE elif obj is False: key = BoolObj.FALSE elif use_enum_value and isinstance(obj, Enum): key = obj.value result_n_count = (None, 0) try: result_n_count = hashes[key] except (TypeError, KeyError): key = get_id(obj) try: result_n_count = hashes[key] except KeyError: raise KeyError(HASH_LOOKUP_ERR_MSG.format(obj)) from None if obj is UNPROCESSED_KEY: extract_index = None return result_n_count if extract_index is None else result_n_count[extract_index] def __contains__(self, obj): result = False try: result = obj in self.hashes except (TypeError, KeyError): result = False if not result: result = get_id(obj) in self.hashes return result def get(self, key, default=None, extract_index=0): """ Get method for the hashes dictionary. It can extract the hash for a given key that is already calculated when extract_index=0 or the count of items that went to building the object whenextract_index=1. """ return self.get_key(self.hashes, key, default=default, extract_index=extract_index) @staticmethod def get_key(hashes, key, default=None, extract_index=0, use_enum_value=False): """ get_key method for the hashes dictionary. It can extract the hash for a given key that is already calculated when extract_index=0 or the count of items that went to building the object whenextract_index=1. """ try: result = DeepHash._getitem(hashes, key, extract_index=extract_index, use_enum_value=use_enum_value) except KeyError: result = default return result def _get_objects_to_hashes_dict(self, extract_index=0): """ A dictionary containing only the objects to hashes, or a dictionary of objects to the count of items that went to build them. extract_index=0 for hashes and extract_index=1 for counts. """ result = dict_() for key, value in self.hashes.items(): if key is UNPROCESSED_KEY: result[key] = value else: result[key] = value[extract_index] return result def __eq__(self, other): if isinstance(other, DeepHash): return self.hashes == other.hashes else: # We only care about the hashes return self._get_objects_to_hashes_dict() == other __req__ = __eq__ def __repr__(self): """ Hide the counts since it will be confusing to see them when they are hidden everywhere else. """ from deepdiff.summarize import summarize return summarize(self._get_objects_to_hashes_dict(extract_index=0), max_length=500) def __str__(self): return str(self._get_objects_to_hashes_dict(extract_index=0)) def __bool__(self): return bool(self.hashes) def keys(self): return self.hashes.keys() def values(self): return (i[0] for i in self.hashes.values()) # Just grab the item and not its count def items(self): return ((i, v[0]) for i, v in self.hashes.items()) def _prep_obj(self, obj, parent, parents_ids=EMPTY_FROZENSET, is_namedtuple=False, is_pydantic_object=False): """prepping objects""" original_type = type(obj) if not isinstance(obj, type) else obj obj_to_dict_strategies = [] if is_namedtuple: obj_to_dict_strategies.append(lambda o: o._asdict()) elif is_pydantic_object: obj_to_dict_strategies.append(lambda o: {k: v for (k, v) in o.__dict__.items() if v !="model_fields_set"}) else: obj_to_dict_strategies.append(lambda o: o.__dict__) if hasattr(obj, "__slots__"): obj_to_dict_strategies.append(lambda o: {i: getattr(o, i) for i in o.__slots__}) else: import inspect obj_to_dict_strategies.append(lambda o: dict(inspect.getmembers(o, lambda m: not inspect.isroutine(m)))) for get_dict in obj_to_dict_strategies: try: d = get_dict(obj) break except AttributeError: pass else: self.hashes[UNPROCESSED_KEY].append(obj) return (unprocessed, 0) obj = d result, counts = self._prep_dict(obj, parent=parent, parents_ids=parents_ids, print_as_attribute=True, original_type=original_type) result = "nt{}".format(result) if is_namedtuple else "obj{}".format(result) return result, counts def _skip_this(self, obj, parent): skip = False if self.exclude_paths and parent in self.exclude_paths: skip = True if self.include_paths and parent != 'root': if parent not in self.include_paths: skip = True for prefix in self.include_paths: if parent.startswith(prefix): skip = False break elif self.exclude_regex_paths and any( [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]): # type: ignore skip = True elif self.exclude_types_tuple and isinstance(obj, self.exclude_types_tuple): skip = True elif self.exclude_obj_callback and self.exclude_obj_callback(obj, parent): skip = True return skip def _prep_dict(self, obj, parent, parents_ids=EMPTY_FROZENSET, print_as_attribute=False, original_type=None): result = [] counts = 1 key_text = "%s{}".format(INDEX_VS_ATTRIBUTE[print_as_attribute]) for key, item in obj.items(): counts += 1 # ignore private variables if self.ignore_private_variables and isinstance(key, str) and key.startswith('__'): continue key_formatted = "'%s'" % key if not print_as_attribute and isinstance(key, strings) else key key_in_report = key_text % (parent, key_formatted) key_hash, _ = self._hash(key, parent=key_in_report, parents_ids=parents_ids) if not key_hash: continue item_id = get_id(item) if (parents_ids and item_id in parents_ids) or self._skip_this(item, parent=key_in_report): continue parents_ids_added = add_to_frozen_set(parents_ids, item_id) hashed, count = self._hash(item, parent=key_in_report, parents_ids=parents_ids_added) hashed = KEY_TO_VAL_STR.format(key_hash, hashed) result.append(hashed) counts += count result.sort() result = ';'.join(result) if print_as_attribute: type_ = original_type or type(obj) type_str = type_.__name__ for type_group in self.ignore_type_in_groups: if self.type_check_func(type_, type_group): type_str = ','.join(map(lambda x: x.__name__, type_group)) break else: type_str = 'dict' return "{}:{{{}}}".format(type_str, result), counts def _prep_iterable(self, obj, parent, parents_ids=EMPTY_FROZENSET): counts = 1 result = defaultdict(int) for i, item in enumerate(obj): new_parent = "{}[{}]".format(parent, i) if self._skip_this(item, parent=new_parent): continue item_id = get_id(item) if parents_ids and item_id in parents_ids: continue parents_ids_added = add_to_frozen_set(parents_ids, item_id) hashed, count = self._hash(item, parent=new_parent, parents_ids=parents_ids_added) # counting repetitions result[hashed] += 1 counts += count if self.ignore_repetition: result = list(result.keys()) else: result = [ '{}|{}'.format(i, v) for i, v in result.items() ] result = map(str, result) # making sure the result items are string so join command works. if self.ignore_iterable_order: result = sorted(result) result = ','.join(result) result = KEY_TO_VAL_STR.format(type(obj).__name__, result) return result, counts def _prep_bool(self, obj): return BoolObj.TRUE if obj else BoolObj.FALSE def _prep_path(self, obj): type_ = obj.__class__.__name__ return KEY_TO_VAL_STR.format(type_, obj) def _prep_number(self, obj): type_ = "number" if self.ignore_numeric_type_changes else obj.__class__.__name__ if self.significant_digits is not None: obj = self.number_to_string(obj, significant_digits=self.significant_digits, number_format_notation=self.number_format_notation) return KEY_TO_VAL_STR.format(type_, obj) def _prep_ipranges(self, obj): type_ = 'iprange' obj = str(obj) return KEY_TO_VAL_STR.format(type_, obj) def _prep_datetime(self, obj): type_ = 'datetime' obj = datetime_normalize(self.truncate_datetime, obj, default_timezone=self.default_timezone) return KEY_TO_VAL_STR.format(type_, obj) def _prep_date(self, obj): type_ = 'datetime' # yes still datetime but it doesn't need normalization return KEY_TO_VAL_STR.format(type_, obj) def _prep_tuple(self, obj, parent, parents_ids): # Checking to see if it has _fields. Which probably means it is a named # tuple. try: obj._asdict # It must be a normal tuple except AttributeError: result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) # We assume it is a namedtuple then else: result, counts = self._prep_obj(obj, parent, parents_ids=parents_ids, is_namedtuple=True) return result, counts def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET): """The main hash method""" counts = 1 if self.custom_operators is not None: for operator in self.custom_operators: func = getattr(operator, 'normalize_value_for_hashing', None) if func is None: raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator)) else: obj = func(parent, obj) if isinstance(obj, booleanTypes): obj = self._prep_bool(obj) result = None elif self.use_enum_value and isinstance(obj, Enum): obj = obj.value else: result = not_hashed try: result, counts = self.hashes[obj] except (TypeError, KeyError): pass else: return result, counts if self._skip_this(obj, parent): return None, 0 elif obj is None: result = 'NONE' elif isinstance(obj, strings): result = prepare_string_for_hashing( obj, ignore_string_type_changes=self.ignore_string_type_changes, ignore_string_case=self.ignore_string_case, encodings=self.encodings, ignore_encoding_errors=self.ignore_encoding_errors, ) elif isinstance(obj, Path): result = self._prep_path(obj) elif isinstance(obj, times): result = self._prep_datetime(obj) elif isinstance(obj, datetime.date): result = self._prep_date(obj) elif isinstance(obj, numbers): # type: ignore result = self._prep_number(obj) elif isinstance(obj, ipranges): result = self._prep_ipranges(obj) elif isinstance(obj, MutableMapping): result, counts = self._prep_dict(obj=obj, parent=parent, parents_ids=parents_ids) elif isinstance(obj, tuple): result, counts = self._prep_tuple(obj=obj, parent=parent, parents_ids=parents_ids) elif (pandas and isinstance(obj, pandas.DataFrame)): # type: ignore def gen(): # type: ignore yield ('dtype', obj.dtypes) # type: ignore yield ('index', obj.index) # type: ignore yield from obj.items() # type: ignore # which contains (column name, series tuples) result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) elif (polars and isinstance(obj, polars.DataFrame)): # type: ignore def gen(): yield from obj.columns # type: ignore yield from list(obj.schema.items()) # type: ignore yield from obj.rows() # type: ignore result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) elif isinstance(obj, Iterable): result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) elif obj == BoolObj.TRUE or obj == BoolObj.FALSE: result = 'bool:true' if obj is BoolObj.TRUE else 'bool:false' elif isinstance(obj, PydanticBaseModel): result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids, is_pydantic_object=True) else: result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids) if result is not_hashed: # pragma: no cover self.hashes[UNPROCESSED_KEY].append(obj) elif result is unprocessed: pass elif self.apply_hash: if isinstance(obj, strings): result_cleaned = result else: result_cleaned = prepare_string_for_hashing( result, ignore_string_type_changes=self.ignore_string_type_changes, ignore_string_case=self.ignore_string_case) result = self.hasher(result_cleaned) # It is important to keep the hash of all objects. # The hashes will be later used for comparing the objects. # Object to hash when possible otherwise ObjectID to hash try: self.hashes[obj] = (result, counts) except TypeError: obj_id = get_id(obj) self.hashes[obj_id] = (result, counts) return result, counts if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod()