diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff/deephash.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/deepdiff/deephash.py | 627 |
1 files changed, 627 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/deephash.py b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py new file mode 100644 index 00000000..47b900e5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py @@ -0,0 +1,627 @@ +#!/usr/bin/env python +import logging +import datetime +from typing import Union, Optional, Any, List, TYPE_CHECKING +from collections.abc import Iterable, MutableMapping +from collections import defaultdict +from hashlib import sha1, sha256 +from pathlib import Path +from enum import Enum +from deepdiff.helper import (strings, numbers, times, unprocessed, not_hashed, add_to_frozen_set, + convert_item_or_items_into_set_else_none, get_doc, ipranges, + convert_item_or_items_into_compiled_regexes_else_none, + get_id, type_is_subclass_of_type_group, type_in_type_group, + number_to_string, datetime_normalize, KEY_TO_VAL_STR, + get_truncate_datetime, dict_, add_root_to_paths, PydanticBaseModel) + +from deepdiff.base import Base + +if TYPE_CHECKING: + from pytz.tzinfo import BaseTzInfo + + +try: + import pandas +except ImportError: + pandas = False + +try: + import polars +except ImportError: + polars = False +try: + import numpy as np + booleanTypes = (bool, np.bool_) +except ImportError: + booleanTypes = bool + +logger = logging.getLogger(__name__) + +UNPROCESSED_KEY = object() + +EMPTY_FROZENSET = frozenset() + +INDEX_VS_ATTRIBUTE = ('[%s]', '.%s') + + +HASH_LOOKUP_ERR_MSG = '{} is not one of the hashed items.' + + +def sha256hex(obj): + """Use Sha256 as a cryptographic hash.""" + if isinstance(obj, str): + obj = obj.encode('utf-8') + return sha256(obj).hexdigest() + + +def sha1hex(obj): + """Use Sha1 as a cryptographic hash.""" + if isinstance(obj, str): + obj = obj.encode('utf-8') + return sha1(obj).hexdigest() + + +default_hasher = sha256hex + + +def combine_hashes_lists(items, prefix): + """ + Combines lists of hashes into one hash + This can be optimized in future. + It needs to work with both murmur3 hashes (int) and sha256 (str) + Although murmur3 is not used anymore. + """ + if isinstance(prefix, bytes): + prefix = prefix.decode('utf-8') + hashes_bytes = b'' + for item in items: + # In order to make sure the order of hashes in each item does not affect the hash + # we resort them. + hashes_bytes += (''.join(map(str, sorted(item))) + '--').encode('utf-8') + return prefix + str(default_hasher(hashes_bytes)) + + +class BoolObj(Enum): + TRUE = 1 + FALSE = 0 + + +def prepare_string_for_hashing( + obj, + ignore_string_type_changes=False, + ignore_string_case=False, + encodings=None, + ignore_encoding_errors=False, +): + """ + Clean type conversions + """ + original_type = obj.__class__.__name__ + # https://docs.python.org/3/library/codecs.html#codecs.decode + errors_mode = 'ignore' if ignore_encoding_errors else 'strict' + if isinstance(obj, bytes): + err = None + encodings = ['utf-8'] if encodings is None else encodings + encoded = False + for encoding in encodings: + try: + obj = obj.decode(encoding, errors=errors_mode) + encoded = True + break + except UnicodeDecodeError as er: + err = er + if not encoded and err is not None: + obj_decoded = obj.decode('utf-8', errors='ignore') # type: ignore + start = max(err.start - 20, 0) + start_prefix = '' + if start > 0: + start_prefix = '...' + end = err.end + 20 + end_suffix = '...' + if end >= len(obj): + end = len(obj) + end_suffix = '' + raise UnicodeDecodeError( + err.encoding, + err.object, + err.start, + err.end, + f"{err.reason} in '{start_prefix}{obj_decoded[start:end]}{end_suffix}'. Please either pass ignore_encoding_errors=True or pass the encoding via encodings=['utf-8', '...']." + ) from None + if not ignore_string_type_changes: + obj = KEY_TO_VAL_STR.format(original_type, obj) + if ignore_string_case: + obj = obj.lower() + return obj + + +doc = get_doc('deephash_doc.rst') + + +class DeepHash(Base): + __doc__ = doc + + def __init__(self, + obj: Any, + *, + apply_hash=True, + custom_operators: Optional[List[Any]] =None, + default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc, + encodings=None, + exclude_obj_callback=None, + exclude_paths=None, + exclude_regex_paths=None, + exclude_types=None, + hasher=None, + hashes=None, + ignore_encoding_errors=False, + ignore_iterable_order=True, + ignore_numeric_type_changes=False, + ignore_private_variables=True, + ignore_repetition=True, + ignore_string_case=False, + ignore_string_type_changes=False, + ignore_type_in_groups=None, + ignore_type_subclasses=False, + include_paths=None, + number_format_notation="f", + number_to_string_func=None, + parent="root", + significant_digits=None, + truncate_datetime=None, + use_enum_value=False, + **kwargs): + if kwargs: + raise ValueError( + ("The following parameter(s) are not valid: %s\n" + "The valid parameters are obj, hashes, exclude_types, significant_digits, truncate_datetime," + "exclude_paths, include_paths, exclude_regex_paths, hasher, ignore_repetition, " + "number_format_notation, apply_hash, ignore_type_in_groups, ignore_string_type_changes, " + "ignore_numeric_type_changes, ignore_type_subclasses, ignore_string_case " + "number_to_string_func, ignore_private_variables, parent, use_enum_value, default_timezone " + "encodings, ignore_encoding_errors") % ', '.join(kwargs.keys())) + if isinstance(hashes, MutableMapping): + self.hashes = hashes + elif isinstance(hashes, DeepHash): + self.hashes = hashes.hashes + else: + self.hashes = dict_() + exclude_types = set() if exclude_types is None else set(exclude_types) + self.exclude_types_tuple = tuple(exclude_types) # we need tuple for checking isinstance + self.ignore_repetition = ignore_repetition + self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths)) + self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths)) + self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths) + self.hasher = default_hasher if hasher is None else hasher + self.hashes[UNPROCESSED_KEY] = [] + self.use_enum_value = use_enum_value + self.default_timezone = default_timezone + self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes) + self.truncate_datetime = get_truncate_datetime(truncate_datetime) + self.number_format_notation = number_format_notation + self.ignore_type_in_groups = self.get_ignore_types_in_groups( + ignore_type_in_groups=ignore_type_in_groups, + ignore_string_type_changes=ignore_string_type_changes, + ignore_numeric_type_changes=ignore_numeric_type_changes, + ignore_type_subclasses=ignore_type_subclasses) + self.ignore_string_type_changes = ignore_string_type_changes + self.ignore_numeric_type_changes = ignore_numeric_type_changes + self.ignore_string_case = ignore_string_case + self.exclude_obj_callback = exclude_obj_callback + # makes the hash return constant size result if true + # the only time it should be set to False is when + # testing the individual hash functions for different types of objects. + self.apply_hash = apply_hash + self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group + # self.type_check_func = type_is_subclass_of_type_group if ignore_type_subclasses else type_in_type_group + self.number_to_string = number_to_string_func or number_to_string + self.ignore_private_variables = ignore_private_variables + self.encodings = encodings + self.ignore_encoding_errors = ignore_encoding_errors + self.ignore_iterable_order = ignore_iterable_order + self.custom_operators = custom_operators + + self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)})) + + if self.hashes[UNPROCESSED_KEY]: + logger.warning("Can not hash the following items: {}.".format(self.hashes[UNPROCESSED_KEY])) + else: + del self.hashes[UNPROCESSED_KEY] + + sha256hex = sha256hex + sha1hex = sha1hex + + def __getitem__(self, obj, extract_index=0): + return self._getitem(self.hashes, obj, extract_index=extract_index, use_enum_value=self.use_enum_value) + + @staticmethod + def _getitem(hashes, obj, extract_index=0, use_enum_value=False): + """ + extract_index is zero for hash and 1 for count and None to get them both. + To keep it backward compatible, we only get the hash by default so it is set to zero by default. + """ + + key = obj + if obj is True: + key = BoolObj.TRUE + elif obj is False: + key = BoolObj.FALSE + elif use_enum_value and isinstance(obj, Enum): + key = obj.value + + result_n_count = (None, 0) + + try: + result_n_count = hashes[key] + except (TypeError, KeyError): + key = get_id(obj) + try: + result_n_count = hashes[key] + except KeyError: + raise KeyError(HASH_LOOKUP_ERR_MSG.format(obj)) from None + + if obj is UNPROCESSED_KEY: + extract_index = None + + return result_n_count if extract_index is None else result_n_count[extract_index] + + def __contains__(self, obj): + result = False + try: + result = obj in self.hashes + except (TypeError, KeyError): + result = False + if not result: + result = get_id(obj) in self.hashes + return result + + def get(self, key, default=None, extract_index=0): + """ + Get method for the hashes dictionary. + It can extract the hash for a given key that is already calculated when extract_index=0 + or the count of items that went to building the object whenextract_index=1. + """ + return self.get_key(self.hashes, key, default=default, extract_index=extract_index) + + @staticmethod + def get_key(hashes, key, default=None, extract_index=0, use_enum_value=False): + """ + get_key method for the hashes dictionary. + It can extract the hash for a given key that is already calculated when extract_index=0 + or the count of items that went to building the object whenextract_index=1. + """ + try: + result = DeepHash._getitem(hashes, key, extract_index=extract_index, use_enum_value=use_enum_value) + except KeyError: + result = default + return result + + def _get_objects_to_hashes_dict(self, extract_index=0): + """ + A dictionary containing only the objects to hashes, + or a dictionary of objects to the count of items that went to build them. + extract_index=0 for hashes and extract_index=1 for counts. + """ + result = dict_() + for key, value in self.hashes.items(): + if key is UNPROCESSED_KEY: + result[key] = value + else: + result[key] = value[extract_index] + return result + + def __eq__(self, other): + if isinstance(other, DeepHash): + return self.hashes == other.hashes + else: + # We only care about the hashes + return self._get_objects_to_hashes_dict() == other + + __req__ = __eq__ + + def __repr__(self): + """ + Hide the counts since it will be confusing to see them when they are hidden everywhere else. + """ + from deepdiff.summarize import summarize + return summarize(self._get_objects_to_hashes_dict(extract_index=0), max_length=500) + + def __str__(self): + return str(self._get_objects_to_hashes_dict(extract_index=0)) + + def __bool__(self): + return bool(self.hashes) + + def keys(self): + return self.hashes.keys() + + def values(self): + return (i[0] for i in self.hashes.values()) # Just grab the item and not its count + + def items(self): + return ((i, v[0]) for i, v in self.hashes.items()) + + def _prep_obj(self, obj, parent, parents_ids=EMPTY_FROZENSET, is_namedtuple=False, is_pydantic_object=False): + """prepping objects""" + original_type = type(obj) if not isinstance(obj, type) else obj + + obj_to_dict_strategies = [] + if is_namedtuple: + obj_to_dict_strategies.append(lambda o: o._asdict()) + elif is_pydantic_object: + obj_to_dict_strategies.append(lambda o: {k: v for (k, v) in o.__dict__.items() if v !="model_fields_set"}) + else: + obj_to_dict_strategies.append(lambda o: o.__dict__) + + if hasattr(obj, "__slots__"): + obj_to_dict_strategies.append(lambda o: {i: getattr(o, i) for i in o.__slots__}) + else: + import inspect + obj_to_dict_strategies.append(lambda o: dict(inspect.getmembers(o, lambda m: not inspect.isroutine(m)))) + + for get_dict in obj_to_dict_strategies: + try: + d = get_dict(obj) + break + except AttributeError: + pass + else: + self.hashes[UNPROCESSED_KEY].append(obj) + return (unprocessed, 0) + obj = d + + result, counts = self._prep_dict(obj, parent=parent, parents_ids=parents_ids, + print_as_attribute=True, original_type=original_type) + result = "nt{}".format(result) if is_namedtuple else "obj{}".format(result) + return result, counts + + def _skip_this(self, obj, parent): + skip = False + if self.exclude_paths and parent in self.exclude_paths: + skip = True + if self.include_paths and parent != 'root': + if parent not in self.include_paths: + skip = True + for prefix in self.include_paths: + if parent.startswith(prefix): + skip = False + break + elif self.exclude_regex_paths and any( + [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]): # type: ignore + skip = True + elif self.exclude_types_tuple and isinstance(obj, self.exclude_types_tuple): + skip = True + elif self.exclude_obj_callback and self.exclude_obj_callback(obj, parent): + skip = True + return skip + + def _prep_dict(self, obj, parent, parents_ids=EMPTY_FROZENSET, print_as_attribute=False, original_type=None): + + result = [] + counts = 1 + + key_text = "%s{}".format(INDEX_VS_ATTRIBUTE[print_as_attribute]) + for key, item in obj.items(): + counts += 1 + # ignore private variables + if self.ignore_private_variables and isinstance(key, str) and key.startswith('__'): + continue + key_formatted = "'%s'" % key if not print_as_attribute and isinstance(key, strings) else key + key_in_report = key_text % (parent, key_formatted) + + key_hash, _ = self._hash(key, parent=key_in_report, parents_ids=parents_ids) + if not key_hash: + continue + item_id = get_id(item) + if (parents_ids and item_id in parents_ids) or self._skip_this(item, parent=key_in_report): + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + hashed, count = self._hash(item, parent=key_in_report, parents_ids=parents_ids_added) + hashed = KEY_TO_VAL_STR.format(key_hash, hashed) + result.append(hashed) + counts += count + + result.sort() + result = ';'.join(result) + if print_as_attribute: + type_ = original_type or type(obj) + type_str = type_.__name__ + for type_group in self.ignore_type_in_groups: + if self.type_check_func(type_, type_group): + type_str = ','.join(map(lambda x: x.__name__, type_group)) + break + else: + type_str = 'dict' + return "{}:{{{}}}".format(type_str, result), counts + + def _prep_iterable(self, obj, parent, parents_ids=EMPTY_FROZENSET): + + counts = 1 + result = defaultdict(int) + + for i, item in enumerate(obj): + new_parent = "{}[{}]".format(parent, i) + if self._skip_this(item, parent=new_parent): + continue + + item_id = get_id(item) + if parents_ids and item_id in parents_ids: + continue + + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + hashed, count = self._hash(item, parent=new_parent, parents_ids=parents_ids_added) + # counting repetitions + result[hashed] += 1 + counts += count + + if self.ignore_repetition: + result = list(result.keys()) + else: + result = [ + '{}|{}'.format(i, v) for i, v in result.items() + ] + + result = map(str, result) # making sure the result items are string so join command works. + if self.ignore_iterable_order: + result = sorted(result) + result = ','.join(result) + result = KEY_TO_VAL_STR.format(type(obj).__name__, result) + + return result, counts + + def _prep_bool(self, obj): + return BoolObj.TRUE if obj else BoolObj.FALSE + + + def _prep_path(self, obj): + type_ = obj.__class__.__name__ + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_number(self, obj): + type_ = "number" if self.ignore_numeric_type_changes else obj.__class__.__name__ + if self.significant_digits is not None: + obj = self.number_to_string(obj, significant_digits=self.significant_digits, + number_format_notation=self.number_format_notation) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_ipranges(self, obj): + type_ = 'iprange' + obj = str(obj) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_datetime(self, obj): + type_ = 'datetime' + obj = datetime_normalize(self.truncate_datetime, obj, default_timezone=self.default_timezone) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_date(self, obj): + type_ = 'datetime' # yes still datetime but it doesn't need normalization + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_tuple(self, obj, parent, parents_ids): + # Checking to see if it has _fields. Which probably means it is a named + # tuple. + try: + obj._asdict + # It must be a normal tuple + except AttributeError: + result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) + # We assume it is a namedtuple then + else: + result, counts = self._prep_obj(obj, parent, parents_ids=parents_ids, is_namedtuple=True) + return result, counts + + def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET): + """The main hash method""" + counts = 1 + if self.custom_operators is not None: + for operator in self.custom_operators: + func = getattr(operator, 'normalize_value_for_hashing', None) + if func is None: + raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator)) + else: + obj = func(parent, obj) + + if isinstance(obj, booleanTypes): + obj = self._prep_bool(obj) + result = None + elif self.use_enum_value and isinstance(obj, Enum): + obj = obj.value + else: + result = not_hashed + try: + result, counts = self.hashes[obj] + except (TypeError, KeyError): + pass + else: + return result, counts + + if self._skip_this(obj, parent): + return None, 0 + + elif obj is None: + result = 'NONE' + + elif isinstance(obj, strings): + result = prepare_string_for_hashing( + obj, + ignore_string_type_changes=self.ignore_string_type_changes, + ignore_string_case=self.ignore_string_case, + encodings=self.encodings, + ignore_encoding_errors=self.ignore_encoding_errors, + ) + + elif isinstance(obj, Path): + result = self._prep_path(obj) + + elif isinstance(obj, times): + result = self._prep_datetime(obj) + + elif isinstance(obj, datetime.date): + result = self._prep_date(obj) + + elif isinstance(obj, numbers): # type: ignore + result = self._prep_number(obj) + + elif isinstance(obj, ipranges): + result = self._prep_ipranges(obj) + + elif isinstance(obj, MutableMapping): + result, counts = self._prep_dict(obj=obj, parent=parent, parents_ids=parents_ids) + + elif isinstance(obj, tuple): + result, counts = self._prep_tuple(obj=obj, parent=parent, parents_ids=parents_ids) + + elif (pandas and isinstance(obj, pandas.DataFrame)): # type: ignore + def gen(): # type: ignore + yield ('dtype', obj.dtypes) # type: ignore + yield ('index', obj.index) # type: ignore + yield from obj.items() # type: ignore # which contains (column name, series tuples) + result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) + elif (polars and isinstance(obj, polars.DataFrame)): # type: ignore + def gen(): + yield from obj.columns # type: ignore + yield from list(obj.schema.items()) # type: ignore + yield from obj.rows() # type: ignore + result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) + + elif isinstance(obj, Iterable): + result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) + + elif obj == BoolObj.TRUE or obj == BoolObj.FALSE: + result = 'bool:true' if obj is BoolObj.TRUE else 'bool:false' + elif isinstance(obj, PydanticBaseModel): + result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids, is_pydantic_object=True) + else: + result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids) + + if result is not_hashed: # pragma: no cover + self.hashes[UNPROCESSED_KEY].append(obj) + + elif result is unprocessed: + pass + + elif self.apply_hash: + if isinstance(obj, strings): + result_cleaned = result + else: + result_cleaned = prepare_string_for_hashing( + result, ignore_string_type_changes=self.ignore_string_type_changes, + ignore_string_case=self.ignore_string_case) + result = self.hasher(result_cleaned) + + # It is important to keep the hash of all objects. + # The hashes will be later used for comparing the objects. + # Object to hash when possible otherwise ObjectID to hash + try: + self.hashes[obj] = (result, counts) + except TypeError: + obj_id = get_id(obj) + self.hashes[obj_id] = (result, counts) + + return result, counts + + +if __name__ == "__main__": # pragma: no cover + import doctest + doctest.testmod() |