about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/deepdiff/deephash.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/deepdiff/deephash.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff/deephash.py')
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/deephash.py627
1 files changed, 627 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/deephash.py b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py
new file mode 100644
index 00000000..47b900e5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py
@@ -0,0 +1,627 @@
+#!/usr/bin/env python
+import logging
+import datetime
+from typing import Union, Optional, Any, List, TYPE_CHECKING
+from collections.abc import Iterable, MutableMapping
+from collections import defaultdict
+from hashlib import sha1, sha256
+from pathlib import Path
+from enum import Enum
+from deepdiff.helper import (strings, numbers, times, unprocessed, not_hashed, add_to_frozen_set,
+                             convert_item_or_items_into_set_else_none, get_doc, ipranges,
+                             convert_item_or_items_into_compiled_regexes_else_none,
+                             get_id, type_is_subclass_of_type_group, type_in_type_group,
+                             number_to_string, datetime_normalize, KEY_TO_VAL_STR,
+                             get_truncate_datetime, dict_, add_root_to_paths, PydanticBaseModel)
+
+from deepdiff.base import Base
+
+if TYPE_CHECKING:
+    from pytz.tzinfo import BaseTzInfo
+
+
+try:
+    import pandas
+except ImportError:
+    pandas = False
+
+try:
+    import polars
+except ImportError:
+    polars = False
+try:
+    import numpy as np
+    booleanTypes = (bool, np.bool_)
+except ImportError:
+    booleanTypes = bool
+
+logger = logging.getLogger(__name__)
+
+UNPROCESSED_KEY = object()
+
+EMPTY_FROZENSET = frozenset()
+
+INDEX_VS_ATTRIBUTE = ('[%s]', '.%s')
+
+
+HASH_LOOKUP_ERR_MSG = '{} is not one of the hashed items.'
+
+
+def sha256hex(obj):
+    """Use Sha256 as a cryptographic hash."""
+    if isinstance(obj, str):
+        obj = obj.encode('utf-8')
+    return sha256(obj).hexdigest()
+
+
+def sha1hex(obj):
+    """Use Sha1 as a cryptographic hash."""
+    if isinstance(obj, str):
+        obj = obj.encode('utf-8')
+    return sha1(obj).hexdigest()
+
+
+default_hasher = sha256hex
+
+
+def combine_hashes_lists(items, prefix):
+    """
+    Combines lists of hashes into one hash
+    This can be optimized in future.
+    It needs to work with both murmur3 hashes (int) and sha256 (str)
+    Although murmur3 is not used anymore.
+    """
+    if isinstance(prefix, bytes):
+        prefix = prefix.decode('utf-8')
+    hashes_bytes = b''
+    for item in items:
+        # In order to make sure the order of hashes in each item does not affect the hash
+        # we resort them.
+        hashes_bytes += (''.join(map(str, sorted(item))) + '--').encode('utf-8')
+    return prefix + str(default_hasher(hashes_bytes))
+
+
+class BoolObj(Enum):
+    TRUE = 1
+    FALSE = 0
+
+
+def prepare_string_for_hashing(
+        obj,
+        ignore_string_type_changes=False,
+        ignore_string_case=False,
+        encodings=None,
+        ignore_encoding_errors=False,
+):
+    """
+    Clean type conversions
+    """
+    original_type = obj.__class__.__name__
+    # https://docs.python.org/3/library/codecs.html#codecs.decode
+    errors_mode = 'ignore' if ignore_encoding_errors else 'strict'
+    if isinstance(obj, bytes):
+        err = None
+        encodings = ['utf-8'] if encodings is None else encodings
+        encoded = False
+        for encoding in encodings:
+            try:
+                obj = obj.decode(encoding, errors=errors_mode)
+                encoded = True
+                break
+            except UnicodeDecodeError as er:
+                err = er
+        if not encoded and err is not None:
+            obj_decoded = obj.decode('utf-8', errors='ignore')  # type: ignore
+            start = max(err.start - 20, 0)
+            start_prefix = ''
+            if start > 0:
+                start_prefix = '...'
+            end = err.end + 20
+            end_suffix = '...'
+            if end >= len(obj):
+                end = len(obj)
+                end_suffix = ''
+            raise UnicodeDecodeError(
+                err.encoding,
+                err.object,
+                err.start,
+                err.end,
+                f"{err.reason} in '{start_prefix}{obj_decoded[start:end]}{end_suffix}'. Please either pass ignore_encoding_errors=True or pass the encoding via encodings=['utf-8', '...']."
+            ) from None
+    if not ignore_string_type_changes:
+        obj = KEY_TO_VAL_STR.format(original_type, obj)
+    if ignore_string_case:
+        obj = obj.lower()
+    return obj
+
+
+doc = get_doc('deephash_doc.rst')
+
+
+class DeepHash(Base):
+    __doc__ = doc
+
+    def __init__(self,
+                 obj: Any,
+                 *,
+                 apply_hash=True,
+                 custom_operators: Optional[List[Any]] =None,
+                 default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc,
+                 encodings=None,
+                 exclude_obj_callback=None,
+                 exclude_paths=None,
+                 exclude_regex_paths=None,
+                 exclude_types=None,
+                 hasher=None,
+                 hashes=None,
+                 ignore_encoding_errors=False,
+                 ignore_iterable_order=True,
+                 ignore_numeric_type_changes=False,
+                 ignore_private_variables=True,
+                 ignore_repetition=True,
+                 ignore_string_case=False,
+                 ignore_string_type_changes=False,
+                 ignore_type_in_groups=None,
+                 ignore_type_subclasses=False,
+                 include_paths=None,
+                 number_format_notation="f",
+                 number_to_string_func=None,
+                 parent="root",
+                 significant_digits=None,
+                 truncate_datetime=None,
+                 use_enum_value=False,
+                 **kwargs):
+        if kwargs:
+            raise ValueError(
+                ("The following parameter(s) are not valid: %s\n"
+                 "The valid parameters are obj, hashes, exclude_types, significant_digits, truncate_datetime,"
+                 "exclude_paths, include_paths, exclude_regex_paths, hasher, ignore_repetition, "
+                 "number_format_notation, apply_hash, ignore_type_in_groups, ignore_string_type_changes, "
+                 "ignore_numeric_type_changes, ignore_type_subclasses, ignore_string_case "
+                 "number_to_string_func, ignore_private_variables, parent, use_enum_value, default_timezone "
+                 "encodings, ignore_encoding_errors") % ', '.join(kwargs.keys()))
+        if isinstance(hashes, MutableMapping):
+            self.hashes = hashes
+        elif isinstance(hashes, DeepHash):
+            self.hashes = hashes.hashes
+        else:
+            self.hashes = dict_()
+        exclude_types = set() if exclude_types is None else set(exclude_types)
+        self.exclude_types_tuple = tuple(exclude_types)  # we need tuple for checking isinstance
+        self.ignore_repetition = ignore_repetition
+        self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths))
+        self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths))
+        self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths)
+        self.hasher = default_hasher if hasher is None else hasher
+        self.hashes[UNPROCESSED_KEY] = []
+        self.use_enum_value = use_enum_value
+        self.default_timezone = default_timezone
+        self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes)
+        self.truncate_datetime = get_truncate_datetime(truncate_datetime)
+        self.number_format_notation = number_format_notation
+        self.ignore_type_in_groups = self.get_ignore_types_in_groups(
+            ignore_type_in_groups=ignore_type_in_groups,
+            ignore_string_type_changes=ignore_string_type_changes,
+            ignore_numeric_type_changes=ignore_numeric_type_changes,
+            ignore_type_subclasses=ignore_type_subclasses)
+        self.ignore_string_type_changes = ignore_string_type_changes
+        self.ignore_numeric_type_changes = ignore_numeric_type_changes
+        self.ignore_string_case = ignore_string_case
+        self.exclude_obj_callback = exclude_obj_callback
+        # makes the hash return constant size result if true
+        # the only time it should be set to False is when
+        # testing the individual hash functions for different types of objects.
+        self.apply_hash = apply_hash
+        self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group
+        # self.type_check_func = type_is_subclass_of_type_group if ignore_type_subclasses else type_in_type_group
+        self.number_to_string = number_to_string_func or number_to_string
+        self.ignore_private_variables = ignore_private_variables
+        self.encodings = encodings
+        self.ignore_encoding_errors = ignore_encoding_errors
+        self.ignore_iterable_order = ignore_iterable_order
+        self.custom_operators = custom_operators
+
+        self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)}))
+
+        if self.hashes[UNPROCESSED_KEY]:
+            logger.warning("Can not hash the following items: {}.".format(self.hashes[UNPROCESSED_KEY]))
+        else:
+            del self.hashes[UNPROCESSED_KEY]
+
+    sha256hex = sha256hex
+    sha1hex = sha1hex
+
+    def __getitem__(self, obj, extract_index=0):
+        return self._getitem(self.hashes, obj, extract_index=extract_index, use_enum_value=self.use_enum_value)
+
+    @staticmethod
+    def _getitem(hashes, obj, extract_index=0, use_enum_value=False):
+        """
+        extract_index is zero for hash and 1 for count and None to get them both.
+        To keep it backward compatible, we only get the hash by default so it is set to zero by default.
+        """
+
+        key = obj
+        if obj is True:
+            key = BoolObj.TRUE
+        elif obj is False:
+            key = BoolObj.FALSE
+        elif use_enum_value and isinstance(obj, Enum):
+            key = obj.value
+
+        result_n_count = (None, 0)
+
+        try:
+            result_n_count = hashes[key]
+        except (TypeError, KeyError):
+            key = get_id(obj)
+            try:
+                result_n_count = hashes[key]
+            except KeyError:
+                raise KeyError(HASH_LOOKUP_ERR_MSG.format(obj)) from None
+
+        if obj is UNPROCESSED_KEY:
+            extract_index = None
+
+        return result_n_count if extract_index is None else result_n_count[extract_index]
+
+    def __contains__(self, obj):
+        result = False
+        try:
+            result = obj in self.hashes
+        except (TypeError, KeyError):
+            result = False
+        if not result:
+            result = get_id(obj) in self.hashes
+        return result
+
+    def get(self, key, default=None, extract_index=0):
+        """
+        Get method for the hashes dictionary.
+        It can extract the hash for a given key that is already calculated when extract_index=0
+        or the count of items that went to building the object whenextract_index=1.
+        """
+        return self.get_key(self.hashes, key, default=default, extract_index=extract_index)
+
+    @staticmethod
+    def get_key(hashes, key, default=None, extract_index=0, use_enum_value=False):
+        """
+        get_key method for the hashes dictionary.
+        It can extract the hash for a given key that is already calculated when extract_index=0
+        or the count of items that went to building the object whenextract_index=1.
+        """
+        try:
+            result = DeepHash._getitem(hashes, key, extract_index=extract_index, use_enum_value=use_enum_value)
+        except KeyError:
+            result = default
+        return result
+
+    def _get_objects_to_hashes_dict(self, extract_index=0):
+        """
+        A dictionary containing only the objects to hashes,
+        or a dictionary of objects to the count of items that went to build them.
+        extract_index=0 for hashes and extract_index=1 for counts.
+        """
+        result = dict_()
+        for key, value in self.hashes.items():
+            if key is UNPROCESSED_KEY:
+                result[key] = value
+            else:
+                result[key] = value[extract_index]
+        return result
+
+    def __eq__(self, other):
+        if isinstance(other, DeepHash):
+            return self.hashes == other.hashes
+        else:
+            # We only care about the hashes
+            return self._get_objects_to_hashes_dict() == other
+
+    __req__ = __eq__
+
+    def __repr__(self):
+        """
+        Hide the counts since it will be confusing to see them when they are hidden everywhere else.
+        """
+        from deepdiff.summarize import summarize
+        return summarize(self._get_objects_to_hashes_dict(extract_index=0), max_length=500)
+
+    def __str__(self):
+        return str(self._get_objects_to_hashes_dict(extract_index=0))
+
+    def __bool__(self):
+        return bool(self.hashes)
+
+    def keys(self):
+        return self.hashes.keys()
+
+    def values(self):
+        return (i[0] for i in self.hashes.values())  # Just grab the item and not its count
+
+    def items(self):
+        return ((i, v[0]) for i, v in self.hashes.items())
+
+    def _prep_obj(self, obj, parent, parents_ids=EMPTY_FROZENSET, is_namedtuple=False, is_pydantic_object=False):
+        """prepping objects"""
+        original_type = type(obj) if not isinstance(obj, type) else obj
+
+        obj_to_dict_strategies = []
+        if is_namedtuple:
+            obj_to_dict_strategies.append(lambda o: o._asdict())
+        elif is_pydantic_object:
+            obj_to_dict_strategies.append(lambda o: {k: v for (k, v) in o.__dict__.items() if v !="model_fields_set"})
+        else:
+            obj_to_dict_strategies.append(lambda o: o.__dict__)
+
+        if hasattr(obj, "__slots__"):
+            obj_to_dict_strategies.append(lambda o: {i: getattr(o, i) for i in o.__slots__})
+        else:
+            import inspect
+            obj_to_dict_strategies.append(lambda o: dict(inspect.getmembers(o, lambda m: not inspect.isroutine(m))))
+
+        for get_dict in obj_to_dict_strategies:
+            try:
+                d = get_dict(obj)
+                break
+            except AttributeError:
+                pass
+        else:
+            self.hashes[UNPROCESSED_KEY].append(obj)
+            return (unprocessed, 0)
+        obj = d
+
+        result, counts = self._prep_dict(obj, parent=parent, parents_ids=parents_ids,
+                                         print_as_attribute=True, original_type=original_type)
+        result = "nt{}".format(result) if is_namedtuple else "obj{}".format(result)
+        return result, counts
+
+    def _skip_this(self, obj, parent):
+        skip = False
+        if self.exclude_paths and parent in self.exclude_paths:
+            skip = True
+        if self.include_paths and parent != 'root':
+            if parent not in self.include_paths:
+                skip = True
+                for prefix in self.include_paths:
+                    if parent.startswith(prefix):
+                        skip = False
+                        break
+        elif self.exclude_regex_paths and any(
+                [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]):  # type: ignore
+            skip = True
+        elif self.exclude_types_tuple and isinstance(obj, self.exclude_types_tuple):
+            skip = True
+        elif self.exclude_obj_callback and self.exclude_obj_callback(obj, parent):
+            skip = True
+        return skip
+
+    def _prep_dict(self, obj, parent, parents_ids=EMPTY_FROZENSET, print_as_attribute=False, original_type=None):
+
+        result = []
+        counts = 1
+
+        key_text = "%s{}".format(INDEX_VS_ATTRIBUTE[print_as_attribute])
+        for key, item in obj.items():
+            counts += 1
+            # ignore private variables
+            if self.ignore_private_variables and isinstance(key, str) and key.startswith('__'):
+                continue
+            key_formatted = "'%s'" % key if not print_as_attribute and isinstance(key, strings) else key
+            key_in_report = key_text % (parent, key_formatted)
+
+            key_hash, _ = self._hash(key, parent=key_in_report, parents_ids=parents_ids)
+            if not key_hash:
+                continue
+            item_id = get_id(item)
+            if (parents_ids and item_id in parents_ids) or self._skip_this(item, parent=key_in_report):
+                continue
+            parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+            hashed, count = self._hash(item, parent=key_in_report, parents_ids=parents_ids_added)
+            hashed = KEY_TO_VAL_STR.format(key_hash, hashed)
+            result.append(hashed)
+            counts += count
+
+        result.sort()
+        result = ';'.join(result)
+        if print_as_attribute:
+            type_ = original_type or type(obj)
+            type_str = type_.__name__
+            for type_group in self.ignore_type_in_groups:
+                if self.type_check_func(type_, type_group):
+                    type_str = ','.join(map(lambda x: x.__name__, type_group))
+                    break
+        else:
+            type_str = 'dict'
+        return "{}:{{{}}}".format(type_str, result), counts
+
+    def _prep_iterable(self, obj, parent, parents_ids=EMPTY_FROZENSET):
+
+        counts = 1
+        result = defaultdict(int)
+
+        for i, item in enumerate(obj):
+            new_parent = "{}[{}]".format(parent, i)
+            if self._skip_this(item, parent=new_parent):
+                continue
+
+            item_id = get_id(item)
+            if parents_ids and item_id in parents_ids:
+                continue
+
+            parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+            hashed, count = self._hash(item, parent=new_parent, parents_ids=parents_ids_added)
+            # counting repetitions
+            result[hashed] += 1
+            counts += count
+
+        if self.ignore_repetition:
+            result = list(result.keys())
+        else:
+            result = [
+                '{}|{}'.format(i, v) for i, v in result.items()
+            ]
+
+        result = map(str, result) # making sure the result items are string so join command works.
+        if self.ignore_iterable_order:
+            result = sorted(result)  
+        result = ','.join(result)
+        result = KEY_TO_VAL_STR.format(type(obj).__name__, result)
+
+        return result, counts
+
+    def _prep_bool(self, obj):
+        return BoolObj.TRUE if obj else BoolObj.FALSE
+
+
+    def _prep_path(self, obj):
+        type_ = obj.__class__.__name__
+        return KEY_TO_VAL_STR.format(type_, obj)
+
+    def _prep_number(self, obj):
+        type_ = "number" if self.ignore_numeric_type_changes else obj.__class__.__name__
+        if self.significant_digits is not None:
+            obj = self.number_to_string(obj, significant_digits=self.significant_digits,
+                                        number_format_notation=self.number_format_notation)
+        return KEY_TO_VAL_STR.format(type_, obj)
+
+    def _prep_ipranges(self, obj):
+        type_ = 'iprange'
+        obj = str(obj)
+        return KEY_TO_VAL_STR.format(type_, obj)
+
+    def _prep_datetime(self, obj):
+        type_ = 'datetime'
+        obj = datetime_normalize(self.truncate_datetime, obj, default_timezone=self.default_timezone)
+        return KEY_TO_VAL_STR.format(type_, obj)
+
+    def _prep_date(self, obj):
+        type_ = 'datetime'  # yes still datetime but it doesn't need normalization
+        return KEY_TO_VAL_STR.format(type_, obj)
+
+    def _prep_tuple(self, obj, parent, parents_ids):
+        # Checking to see if it has _fields. Which probably means it is a named
+        # tuple.
+        try:
+            obj._asdict
+        # It must be a normal tuple
+        except AttributeError:
+            result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids)
+        # We assume it is a namedtuple then
+        else:
+            result, counts = self._prep_obj(obj, parent, parents_ids=parents_ids, is_namedtuple=True)
+        return result, counts
+
+    def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET):
+        """The main hash method"""
+        counts = 1
+        if self.custom_operators is not None:
+            for operator in self.custom_operators:
+                func = getattr(operator, 'normalize_value_for_hashing', None)
+                if func is None:
+                    raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator))
+                else:
+                    obj = func(parent, obj)
+
+        if isinstance(obj, booleanTypes):
+            obj = self._prep_bool(obj)
+            result = None
+        elif self.use_enum_value and isinstance(obj, Enum):
+            obj = obj.value
+        else:
+            result = not_hashed
+        try:
+            result, counts = self.hashes[obj]
+        except (TypeError, KeyError):
+            pass
+        else:
+            return result, counts
+
+        if self._skip_this(obj, parent):
+            return None, 0
+
+        elif obj is None:
+            result = 'NONE'
+
+        elif isinstance(obj, strings):
+            result = prepare_string_for_hashing(
+                obj,
+                ignore_string_type_changes=self.ignore_string_type_changes,
+                ignore_string_case=self.ignore_string_case,
+                encodings=self.encodings,
+                ignore_encoding_errors=self.ignore_encoding_errors,
+            )
+
+        elif isinstance(obj, Path):
+            result = self._prep_path(obj)
+
+        elif isinstance(obj, times):
+            result = self._prep_datetime(obj)
+
+        elif isinstance(obj, datetime.date):
+            result = self._prep_date(obj)
+
+        elif isinstance(obj, numbers):  # type: ignore
+            result = self._prep_number(obj)
+
+        elif isinstance(obj, ipranges):
+            result = self._prep_ipranges(obj)
+
+        elif isinstance(obj, MutableMapping):
+            result, counts = self._prep_dict(obj=obj, parent=parent, parents_ids=parents_ids)
+
+        elif isinstance(obj, tuple):
+            result, counts = self._prep_tuple(obj=obj, parent=parent, parents_ids=parents_ids)
+
+        elif (pandas and isinstance(obj, pandas.DataFrame)):  # type: ignore
+            def gen():  # type: ignore
+                yield ('dtype', obj.dtypes)  # type: ignore
+                yield ('index', obj.index)  # type: ignore
+                yield from obj.items()  # type: ignore  # which contains (column name, series tuples)
+            result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids)
+        elif (polars and isinstance(obj, polars.DataFrame)):  # type: ignore
+            def gen():
+                yield from obj.columns  # type: ignore
+                yield from list(obj.schema.items())  # type: ignore
+                yield from obj.rows()  # type: ignore
+            result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids)
+
+        elif isinstance(obj, Iterable):
+            result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids)
+
+        elif obj == BoolObj.TRUE or obj == BoolObj.FALSE:
+            result = 'bool:true' if obj is BoolObj.TRUE else 'bool:false'
+        elif isinstance(obj, PydanticBaseModel):
+            result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids, is_pydantic_object=True)
+        else:
+            result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids)
+
+        if result is not_hashed:  # pragma: no cover
+            self.hashes[UNPROCESSED_KEY].append(obj)
+
+        elif result is unprocessed:
+            pass
+
+        elif self.apply_hash:
+            if isinstance(obj, strings):
+                result_cleaned = result
+            else:
+                result_cleaned = prepare_string_for_hashing(
+                    result, ignore_string_type_changes=self.ignore_string_type_changes,
+                    ignore_string_case=self.ignore_string_case)
+            result = self.hasher(result_cleaned)
+
+        # It is important to keep the hash of all objects.
+        # The hashes will be later used for comparing the objects.
+        # Object to hash when possible otherwise ObjectID to hash
+        try:
+            self.hashes[obj] = (result, counts)
+        except TypeError:
+            obj_id = get_id(obj)
+            self.hashes[obj_id] = (result, counts)
+
+        return result, counts
+
+
+if __name__ == "__main__":  # pragma: no cover
+    import doctest
+    doctest.testmod()