diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff')
17 files changed, 8099 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/__init__.py b/.venv/lib/python3.12/site-packages/deepdiff/__init__.py new file mode 100644 index 00000000..c784c558 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/__init__.py @@ -0,0 +1,14 @@ +"""This module offers the DeepDiff, DeepSearch, grep, Delta and DeepHash classes.""" +# flake8: noqa +__version__ = '8.4.2' +import logging + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)8s %(message)s') + + +from .diff import DeepDiff as DeepDiff +from .search import DeepSearch as DeepSearch, grep as grep +from .deephash import DeepHash as DeepHash +from .delta import Delta as Delta +from .path import extract as extract, parse_path as parse_path diff --git a/.venv/lib/python3.12/site-packages/deepdiff/anyset.py b/.venv/lib/python3.12/site-packages/deepdiff/anyset.py new file mode 100644 index 00000000..cd87ac38 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/anyset.py @@ -0,0 +1,65 @@ +from deepdiff.deephash import DeepHash +from deepdiff.helper import dict_, SetOrdered + + +class AnySet: + """ + Any object can be in this set whether hashable or not. + Note that the current implementation has memory leak and keeps + traces of objects in itself even after popping. + However one the AnySet object is deleted, all those traces will be gone too. + """ + def __init__(self, items=None): + self._set = SetOrdered() + self._hashes = dict_() + self._hash_to_objects = dict_() + if items: + for item in items: + self.add(item) + + def add(self, item): + try: + self._set.add(item) + except TypeError: + hashes_obj = DeepHash(item, hashes=self._hashes) + hash_ = hashes_obj[item] + if hash_ not in self._hash_to_objects: + self._hash_to_objects[hash_] = item + + def __contains__(self, item): + try: + result = item in self._set + except TypeError: + hashes_obj = DeepHash(item, hashes=self._hashes) + hash_ = hashes_obj[item] + result = hash_ in self._hash_to_objects + return result + + def pop(self): + if self._set: + return self._set.pop() + else: + return self._hash_to_objects.pop(next(iter(self._hash_to_objects))) + + def __eq__(self, other): + set_part, hashes_to_objs_part = other + return (self._set == set_part and self._hash_to_objects == hashes_to_objs_part) + + __req__ = __eq__ + + def __repr__(self): + return "< AnySet {}, {} >".format(self._set, self._hash_to_objects) + + __str__ = __repr__ + + def __len__(self): + return len(self._set) + len(self._hash_to_objects) + + def __iter__(self): + for item in self._set: + yield item + for item in self._hash_to_objects.values(): + yield item + + def __bool__(self): + return bool(self._set or self._hash_to_objects) diff --git a/.venv/lib/python3.12/site-packages/deepdiff/base.py b/.venv/lib/python3.12/site-packages/deepdiff/base.py new file mode 100644 index 00000000..d3b24fb8 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/base.py @@ -0,0 +1,51 @@ +from typing import Any +from deepdiff.helper import strings, numbers, SetOrdered + + +DEFAULT_SIGNIFICANT_DIGITS_WHEN_IGNORE_NUMERIC_TYPES = 12 +TYPE_STABILIZATION_MSG = 'Unable to stabilize the Numpy array {} due to {}. Please set ignore_order=False.' + + +class Base: + numbers = numbers + strings = strings + + def get_significant_digits(self, significant_digits, ignore_numeric_type_changes): + if significant_digits is not None and significant_digits < 0: + raise ValueError( + "significant_digits must be None or a non-negative integer") + if significant_digits is None: + if ignore_numeric_type_changes: + significant_digits = DEFAULT_SIGNIFICANT_DIGITS_WHEN_IGNORE_NUMERIC_TYPES + return significant_digits + + def get_ignore_types_in_groups(self, ignore_type_in_groups, + ignore_string_type_changes, + ignore_numeric_type_changes, + ignore_type_subclasses): + if ignore_type_in_groups: + if isinstance(ignore_type_in_groups[0], type): + ignore_type_in_groups = [ignore_type_in_groups] + else: + ignore_type_in_groups = [] + + result = [] + for item_group in ignore_type_in_groups: + new_item_group = SetOrdered() + for item in item_group: + item = type(item) if item is None or not isinstance(item, type) else item + new_item_group.add(item) + result.append(new_item_group) + ignore_type_in_groups = result + + if ignore_string_type_changes and self.strings not in ignore_type_in_groups: + ignore_type_in_groups.append(SetOrdered(self.strings)) + + if ignore_numeric_type_changes and self.numbers not in ignore_type_in_groups: + ignore_type_in_groups.append(SetOrdered(self.numbers)) + + if not ignore_type_subclasses: + # is_instance method needs tuples. When we look for subclasses, we need them to be tuples + ignore_type_in_groups = list(map(tuple, ignore_type_in_groups)) + + return ignore_type_in_groups diff --git a/.venv/lib/python3.12/site-packages/deepdiff/commands.py b/.venv/lib/python3.12/site-packages/deepdiff/commands.py new file mode 100644 index 00000000..1859e35a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/commands.py @@ -0,0 +1,232 @@ +import click +import sys +from decimal import Decimal +from pprint import pprint +from deepdiff.diff import ( + DeepDiff, + CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT, + CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT, + logger +) +from deepdiff import Delta, DeepSearch, extract as deep_extract +from deepdiff.serialization import load_path_content, save_content_to_path + +try: + import orjson +except ImportError: + orjson = None + + +@click.group() +def cli(): + """A simple command line tool.""" + pass # pragma: no cover. + + +@cli.command() +@click.argument('t1', type=click.Path(exists=True, resolve_path=True)) +@click.argument('t2', type=click.Path(exists=True, resolve_path=True)) +@click.option('--cutoff-distance-for-pairs', required=False, default=CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT, type=float, show_default=True) +@click.option('--cutoff-intersection-for-pairs', required=False, default=CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT, type=float, show_default=True) +@click.option('--cache-size', required=False, default=0, type=int, show_default=True) +@click.option('--cache-tuning-sample-size', required=False, default=0, type=int, show_default=True) +@click.option('--cache-purge-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True) +@click.option('--create-patch', is_flag=True, show_default=True) +@click.option('--exclude-paths', required=False, type=str, show_default=False, multiple=True) +@click.option('--exclude-regex-paths', required=False, type=str, show_default=False, multiple=True) +@click.option('--math-epsilon', required=False, type=Decimal, show_default=False) +@click.option('--get-deep-distance', is_flag=True, show_default=True) +@click.option('--group-by', required=False, type=str, show_default=False, multiple=False) +@click.option('--ignore-order', is_flag=True, show_default=True) +@click.option('--ignore-string-type-changes', is_flag=True, show_default=True) +@click.option('--ignore-numeric-type-changes', is_flag=True, show_default=True) +@click.option('--ignore-type-subclasses', is_flag=True, show_default=True) +@click.option('--ignore-string-case', is_flag=True, show_default=True) +@click.option('--ignore-nan-inequality', is_flag=True, show_default=True) +@click.option('--include-private-variables', is_flag=True, show_default=True) +@click.option('--log-frequency-in-sec', required=False, default=0, type=int, show_default=True) +@click.option('--max-passes', required=False, default=10000000, type=int, show_default=True) +@click.option('--max_diffs', required=False, default=None, type=int, show_default=True) +@click.option('--threshold-to-diff-deeper', required=False, default=0.33, type=float, show_default=False) +@click.option('--number-format-notation', required=False, type=click.Choice(['f', 'e'], case_sensitive=True), show_default=True, default="f") +@click.option('--progress-logger', required=False, type=click.Choice(['info', 'error'], case_sensitive=True), show_default=True, default="info") +@click.option('--report-repetition', is_flag=True, show_default=True) +@click.option('--significant-digits', required=False, default=None, type=int, show_default=True) +@click.option('--truncate-datetime', required=False, type=click.Choice(['second', 'minute', 'hour', 'day'], case_sensitive=True), show_default=True, default=None) +@click.option('--verbose-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True) +@click.option('--debug', is_flag=True, show_default=False) +def diff( + *args, **kwargs +): + """ + Deep Diff Commandline + + Deep Difference of content in files. + It can read csv, tsv, json, yaml, and toml files. + + T1 and T2 are the path to the files to be compared with each other. + """ + debug = kwargs.pop('debug') + kwargs['ignore_private_variables'] = not kwargs.pop('include_private_variables') + kwargs['progress_logger'] = logger.info if kwargs['progress_logger'] == 'info' else logger.error + create_patch = kwargs.pop('create_patch') + t1_path = kwargs.pop("t1") + t2_path = kwargs.pop("t2") + t1_extension = t1_path.split('.')[-1] + t2_extension = t2_path.split('.')[-1] + + for name, t_path, t_extension in [('t1', t1_path, t1_extension), ('t2', t2_path, t2_extension)]: + try: + kwargs[name] = load_path_content(t_path, file_type=t_extension) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when loading {name}: {e}")) # pragma: no cover. + + # if (t1_extension != t2_extension): + if t1_extension in {'csv', 'tsv'}: + kwargs['t1'] = [dict(i) for i in kwargs['t1']] + if t2_extension in {'csv', 'tsv'}: + kwargs['t2'] = [dict(i) for i in kwargs['t2']] + + if create_patch: + # Disabling logging progress since it will leak into stdout + kwargs['log_frequency_in_sec'] = 0 + + try: + diff = DeepDiff(**kwargs) + except Exception as e: # pragma: no cover. No need to test this. + sys.exit(str(e)) # pragma: no cover. No need to test this. + + if create_patch: + try: + delta = Delta(diff) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(f"Error when loading the patch (aka delta): {e}") # pragma: no cover. + + # printing into stdout + sys.stdout.buffer.write(delta.dumps()) + else: + try: + print(diff.to_json(indent=2)) + except Exception: + pprint(diff, indent=2) + + +@cli.command() +@click.argument('path', type=click.Path(exists=True, resolve_path=True)) +@click.argument('delta_path', type=click.Path(exists=True, resolve_path=True)) +@click.option('--backup', '-b', is_flag=True, show_default=True) +@click.option('--raise-errors', is_flag=True, show_default=True) +@click.option('--debug', is_flag=True, show_default=False) +def patch( + path, delta_path, backup, raise_errors, debug +): + """ + Deep Patch Commandline + + Patches a file based on the information in a delta file. + The delta file can be created by the deep diff command and + passing the --create-patch argument. + + Deep Patch is similar to Linux's patch command. + The difference is that it is made for patching data. + It can read csv, tsv, json, yaml, and toml files. + + """ + try: + delta = Delta(delta_path=delta_path, raise_errors=raise_errors) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when loading the patch (aka delta) {delta_path}: {e}")) # pragma: no cover. + + extension = path.split('.')[-1] + + try: + content = load_path_content(path, file_type=extension) + except Exception as e: # pragma: no cover. + sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover. + + result = delta + content + + try: + save_content_to_path(result, path, file_type=extension, keep_backup=backup) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when saving {path}: {e}")) # pragma: no cover. + + +@cli.command() +@click.argument('item', required=True, type=str) +@click.argument('path', type=click.Path(exists=True, resolve_path=True)) +@click.option('--ignore-case', '-i', is_flag=True, show_default=True) +@click.option('--exact-match', is_flag=True, show_default=True) +@click.option('--exclude-paths', required=False, type=str, show_default=False, multiple=True) +@click.option('--exclude-regex-paths', required=False, type=str, show_default=False, multiple=True) +@click.option('--verbose-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True) +@click.option('--debug', is_flag=True, show_default=False) +def grep(item, path, debug, **kwargs): + """ + Deep Grep Commandline + + Grep through the contents of a file and find the path to the item. + It can read csv, tsv, json, yaml, and toml files. + + """ + kwargs['case_sensitive'] = not kwargs.pop('ignore_case') + kwargs['match_string'] = kwargs.pop('exact_match') + + try: + content = load_path_content(path) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover. + + try: + result = DeepSearch(content, item, **kwargs) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when running deep search on {path}: {e}")) # pragma: no cover. + pprint(result, indent=2) + + +@cli.command() +@click.argument('path_inside', required=True, type=str) +@click.argument('path', type=click.Path(exists=True, resolve_path=True)) +@click.option('--debug', is_flag=True, show_default=False) +def extract(path_inside, path, debug): + """ + Deep Extract Commandline + + Extract an item from a file based on the path that is passed. + It can read csv, tsv, json, yaml, and toml files. + + """ + try: + content = load_path_content(path) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover. + + try: + result = deep_extract(content, path_inside) + except Exception as e: # pragma: no cover. + if debug: # pragma: no cover. + raise # pragma: no cover. + else: # pragma: no cover. + sys.exit(str(f"Error when running deep search on {path}: {e}")) # pragma: no cover. + pprint(result, indent=2) diff --git a/.venv/lib/python3.12/site-packages/deepdiff/deephash.py b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py new file mode 100644 index 00000000..47b900e5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py @@ -0,0 +1,627 @@ +#!/usr/bin/env python +import logging +import datetime +from typing import Union, Optional, Any, List, TYPE_CHECKING +from collections.abc import Iterable, MutableMapping +from collections import defaultdict +from hashlib import sha1, sha256 +from pathlib import Path +from enum import Enum +from deepdiff.helper import (strings, numbers, times, unprocessed, not_hashed, add_to_frozen_set, + convert_item_or_items_into_set_else_none, get_doc, ipranges, + convert_item_or_items_into_compiled_regexes_else_none, + get_id, type_is_subclass_of_type_group, type_in_type_group, + number_to_string, datetime_normalize, KEY_TO_VAL_STR, + get_truncate_datetime, dict_, add_root_to_paths, PydanticBaseModel) + +from deepdiff.base import Base + +if TYPE_CHECKING: + from pytz.tzinfo import BaseTzInfo + + +try: + import pandas +except ImportError: + pandas = False + +try: + import polars +except ImportError: + polars = False +try: + import numpy as np + booleanTypes = (bool, np.bool_) +except ImportError: + booleanTypes = bool + +logger = logging.getLogger(__name__) + +UNPROCESSED_KEY = object() + +EMPTY_FROZENSET = frozenset() + +INDEX_VS_ATTRIBUTE = ('[%s]', '.%s') + + +HASH_LOOKUP_ERR_MSG = '{} is not one of the hashed items.' + + +def sha256hex(obj): + """Use Sha256 as a cryptographic hash.""" + if isinstance(obj, str): + obj = obj.encode('utf-8') + return sha256(obj).hexdigest() + + +def sha1hex(obj): + """Use Sha1 as a cryptographic hash.""" + if isinstance(obj, str): + obj = obj.encode('utf-8') + return sha1(obj).hexdigest() + + +default_hasher = sha256hex + + +def combine_hashes_lists(items, prefix): + """ + Combines lists of hashes into one hash + This can be optimized in future. + It needs to work with both murmur3 hashes (int) and sha256 (str) + Although murmur3 is not used anymore. + """ + if isinstance(prefix, bytes): + prefix = prefix.decode('utf-8') + hashes_bytes = b'' + for item in items: + # In order to make sure the order of hashes in each item does not affect the hash + # we resort them. + hashes_bytes += (''.join(map(str, sorted(item))) + '--').encode('utf-8') + return prefix + str(default_hasher(hashes_bytes)) + + +class BoolObj(Enum): + TRUE = 1 + FALSE = 0 + + +def prepare_string_for_hashing( + obj, + ignore_string_type_changes=False, + ignore_string_case=False, + encodings=None, + ignore_encoding_errors=False, +): + """ + Clean type conversions + """ + original_type = obj.__class__.__name__ + # https://docs.python.org/3/library/codecs.html#codecs.decode + errors_mode = 'ignore' if ignore_encoding_errors else 'strict' + if isinstance(obj, bytes): + err = None + encodings = ['utf-8'] if encodings is None else encodings + encoded = False + for encoding in encodings: + try: + obj = obj.decode(encoding, errors=errors_mode) + encoded = True + break + except UnicodeDecodeError as er: + err = er + if not encoded and err is not None: + obj_decoded = obj.decode('utf-8', errors='ignore') # type: ignore + start = max(err.start - 20, 0) + start_prefix = '' + if start > 0: + start_prefix = '...' + end = err.end + 20 + end_suffix = '...' + if end >= len(obj): + end = len(obj) + end_suffix = '' + raise UnicodeDecodeError( + err.encoding, + err.object, + err.start, + err.end, + f"{err.reason} in '{start_prefix}{obj_decoded[start:end]}{end_suffix}'. Please either pass ignore_encoding_errors=True or pass the encoding via encodings=['utf-8', '...']." + ) from None + if not ignore_string_type_changes: + obj = KEY_TO_VAL_STR.format(original_type, obj) + if ignore_string_case: + obj = obj.lower() + return obj + + +doc = get_doc('deephash_doc.rst') + + +class DeepHash(Base): + __doc__ = doc + + def __init__(self, + obj: Any, + *, + apply_hash=True, + custom_operators: Optional[List[Any]] =None, + default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc, + encodings=None, + exclude_obj_callback=None, + exclude_paths=None, + exclude_regex_paths=None, + exclude_types=None, + hasher=None, + hashes=None, + ignore_encoding_errors=False, + ignore_iterable_order=True, + ignore_numeric_type_changes=False, + ignore_private_variables=True, + ignore_repetition=True, + ignore_string_case=False, + ignore_string_type_changes=False, + ignore_type_in_groups=None, + ignore_type_subclasses=False, + include_paths=None, + number_format_notation="f", + number_to_string_func=None, + parent="root", + significant_digits=None, + truncate_datetime=None, + use_enum_value=False, + **kwargs): + if kwargs: + raise ValueError( + ("The following parameter(s) are not valid: %s\n" + "The valid parameters are obj, hashes, exclude_types, significant_digits, truncate_datetime," + "exclude_paths, include_paths, exclude_regex_paths, hasher, ignore_repetition, " + "number_format_notation, apply_hash, ignore_type_in_groups, ignore_string_type_changes, " + "ignore_numeric_type_changes, ignore_type_subclasses, ignore_string_case " + "number_to_string_func, ignore_private_variables, parent, use_enum_value, default_timezone " + "encodings, ignore_encoding_errors") % ', '.join(kwargs.keys())) + if isinstance(hashes, MutableMapping): + self.hashes = hashes + elif isinstance(hashes, DeepHash): + self.hashes = hashes.hashes + else: + self.hashes = dict_() + exclude_types = set() if exclude_types is None else set(exclude_types) + self.exclude_types_tuple = tuple(exclude_types) # we need tuple for checking isinstance + self.ignore_repetition = ignore_repetition + self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths)) + self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths)) + self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths) + self.hasher = default_hasher if hasher is None else hasher + self.hashes[UNPROCESSED_KEY] = [] + self.use_enum_value = use_enum_value + self.default_timezone = default_timezone + self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes) + self.truncate_datetime = get_truncate_datetime(truncate_datetime) + self.number_format_notation = number_format_notation + self.ignore_type_in_groups = self.get_ignore_types_in_groups( + ignore_type_in_groups=ignore_type_in_groups, + ignore_string_type_changes=ignore_string_type_changes, + ignore_numeric_type_changes=ignore_numeric_type_changes, + ignore_type_subclasses=ignore_type_subclasses) + self.ignore_string_type_changes = ignore_string_type_changes + self.ignore_numeric_type_changes = ignore_numeric_type_changes + self.ignore_string_case = ignore_string_case + self.exclude_obj_callback = exclude_obj_callback + # makes the hash return constant size result if true + # the only time it should be set to False is when + # testing the individual hash functions for different types of objects. + self.apply_hash = apply_hash + self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group + # self.type_check_func = type_is_subclass_of_type_group if ignore_type_subclasses else type_in_type_group + self.number_to_string = number_to_string_func or number_to_string + self.ignore_private_variables = ignore_private_variables + self.encodings = encodings + self.ignore_encoding_errors = ignore_encoding_errors + self.ignore_iterable_order = ignore_iterable_order + self.custom_operators = custom_operators + + self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)})) + + if self.hashes[UNPROCESSED_KEY]: + logger.warning("Can not hash the following items: {}.".format(self.hashes[UNPROCESSED_KEY])) + else: + del self.hashes[UNPROCESSED_KEY] + + sha256hex = sha256hex + sha1hex = sha1hex + + def __getitem__(self, obj, extract_index=0): + return self._getitem(self.hashes, obj, extract_index=extract_index, use_enum_value=self.use_enum_value) + + @staticmethod + def _getitem(hashes, obj, extract_index=0, use_enum_value=False): + """ + extract_index is zero for hash and 1 for count and None to get them both. + To keep it backward compatible, we only get the hash by default so it is set to zero by default. + """ + + key = obj + if obj is True: + key = BoolObj.TRUE + elif obj is False: + key = BoolObj.FALSE + elif use_enum_value and isinstance(obj, Enum): + key = obj.value + + result_n_count = (None, 0) + + try: + result_n_count = hashes[key] + except (TypeError, KeyError): + key = get_id(obj) + try: + result_n_count = hashes[key] + except KeyError: + raise KeyError(HASH_LOOKUP_ERR_MSG.format(obj)) from None + + if obj is UNPROCESSED_KEY: + extract_index = None + + return result_n_count if extract_index is None else result_n_count[extract_index] + + def __contains__(self, obj): + result = False + try: + result = obj in self.hashes + except (TypeError, KeyError): + result = False + if not result: + result = get_id(obj) in self.hashes + return result + + def get(self, key, default=None, extract_index=0): + """ + Get method for the hashes dictionary. + It can extract the hash for a given key that is already calculated when extract_index=0 + or the count of items that went to building the object whenextract_index=1. + """ + return self.get_key(self.hashes, key, default=default, extract_index=extract_index) + + @staticmethod + def get_key(hashes, key, default=None, extract_index=0, use_enum_value=False): + """ + get_key method for the hashes dictionary. + It can extract the hash for a given key that is already calculated when extract_index=0 + or the count of items that went to building the object whenextract_index=1. + """ + try: + result = DeepHash._getitem(hashes, key, extract_index=extract_index, use_enum_value=use_enum_value) + except KeyError: + result = default + return result + + def _get_objects_to_hashes_dict(self, extract_index=0): + """ + A dictionary containing only the objects to hashes, + or a dictionary of objects to the count of items that went to build them. + extract_index=0 for hashes and extract_index=1 for counts. + """ + result = dict_() + for key, value in self.hashes.items(): + if key is UNPROCESSED_KEY: + result[key] = value + else: + result[key] = value[extract_index] + return result + + def __eq__(self, other): + if isinstance(other, DeepHash): + return self.hashes == other.hashes + else: + # We only care about the hashes + return self._get_objects_to_hashes_dict() == other + + __req__ = __eq__ + + def __repr__(self): + """ + Hide the counts since it will be confusing to see them when they are hidden everywhere else. + """ + from deepdiff.summarize import summarize + return summarize(self._get_objects_to_hashes_dict(extract_index=0), max_length=500) + + def __str__(self): + return str(self._get_objects_to_hashes_dict(extract_index=0)) + + def __bool__(self): + return bool(self.hashes) + + def keys(self): + return self.hashes.keys() + + def values(self): + return (i[0] for i in self.hashes.values()) # Just grab the item and not its count + + def items(self): + return ((i, v[0]) for i, v in self.hashes.items()) + + def _prep_obj(self, obj, parent, parents_ids=EMPTY_FROZENSET, is_namedtuple=False, is_pydantic_object=False): + """prepping objects""" + original_type = type(obj) if not isinstance(obj, type) else obj + + obj_to_dict_strategies = [] + if is_namedtuple: + obj_to_dict_strategies.append(lambda o: o._asdict()) + elif is_pydantic_object: + obj_to_dict_strategies.append(lambda o: {k: v for (k, v) in o.__dict__.items() if v !="model_fields_set"}) + else: + obj_to_dict_strategies.append(lambda o: o.__dict__) + + if hasattr(obj, "__slots__"): + obj_to_dict_strategies.append(lambda o: {i: getattr(o, i) for i in o.__slots__}) + else: + import inspect + obj_to_dict_strategies.append(lambda o: dict(inspect.getmembers(o, lambda m: not inspect.isroutine(m)))) + + for get_dict in obj_to_dict_strategies: + try: + d = get_dict(obj) + break + except AttributeError: + pass + else: + self.hashes[UNPROCESSED_KEY].append(obj) + return (unprocessed, 0) + obj = d + + result, counts = self._prep_dict(obj, parent=parent, parents_ids=parents_ids, + print_as_attribute=True, original_type=original_type) + result = "nt{}".format(result) if is_namedtuple else "obj{}".format(result) + return result, counts + + def _skip_this(self, obj, parent): + skip = False + if self.exclude_paths and parent in self.exclude_paths: + skip = True + if self.include_paths and parent != 'root': + if parent not in self.include_paths: + skip = True + for prefix in self.include_paths: + if parent.startswith(prefix): + skip = False + break + elif self.exclude_regex_paths and any( + [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]): # type: ignore + skip = True + elif self.exclude_types_tuple and isinstance(obj, self.exclude_types_tuple): + skip = True + elif self.exclude_obj_callback and self.exclude_obj_callback(obj, parent): + skip = True + return skip + + def _prep_dict(self, obj, parent, parents_ids=EMPTY_FROZENSET, print_as_attribute=False, original_type=None): + + result = [] + counts = 1 + + key_text = "%s{}".format(INDEX_VS_ATTRIBUTE[print_as_attribute]) + for key, item in obj.items(): + counts += 1 + # ignore private variables + if self.ignore_private_variables and isinstance(key, str) and key.startswith('__'): + continue + key_formatted = "'%s'" % key if not print_as_attribute and isinstance(key, strings) else key + key_in_report = key_text % (parent, key_formatted) + + key_hash, _ = self._hash(key, parent=key_in_report, parents_ids=parents_ids) + if not key_hash: + continue + item_id = get_id(item) + if (parents_ids and item_id in parents_ids) or self._skip_this(item, parent=key_in_report): + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + hashed, count = self._hash(item, parent=key_in_report, parents_ids=parents_ids_added) + hashed = KEY_TO_VAL_STR.format(key_hash, hashed) + result.append(hashed) + counts += count + + result.sort() + result = ';'.join(result) + if print_as_attribute: + type_ = original_type or type(obj) + type_str = type_.__name__ + for type_group in self.ignore_type_in_groups: + if self.type_check_func(type_, type_group): + type_str = ','.join(map(lambda x: x.__name__, type_group)) + break + else: + type_str = 'dict' + return "{}:{{{}}}".format(type_str, result), counts + + def _prep_iterable(self, obj, parent, parents_ids=EMPTY_FROZENSET): + + counts = 1 + result = defaultdict(int) + + for i, item in enumerate(obj): + new_parent = "{}[{}]".format(parent, i) + if self._skip_this(item, parent=new_parent): + continue + + item_id = get_id(item) + if parents_ids and item_id in parents_ids: + continue + + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + hashed, count = self._hash(item, parent=new_parent, parents_ids=parents_ids_added) + # counting repetitions + result[hashed] += 1 + counts += count + + if self.ignore_repetition: + result = list(result.keys()) + else: + result = [ + '{}|{}'.format(i, v) for i, v in result.items() + ] + + result = map(str, result) # making sure the result items are string so join command works. + if self.ignore_iterable_order: + result = sorted(result) + result = ','.join(result) + result = KEY_TO_VAL_STR.format(type(obj).__name__, result) + + return result, counts + + def _prep_bool(self, obj): + return BoolObj.TRUE if obj else BoolObj.FALSE + + + def _prep_path(self, obj): + type_ = obj.__class__.__name__ + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_number(self, obj): + type_ = "number" if self.ignore_numeric_type_changes else obj.__class__.__name__ + if self.significant_digits is not None: + obj = self.number_to_string(obj, significant_digits=self.significant_digits, + number_format_notation=self.number_format_notation) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_ipranges(self, obj): + type_ = 'iprange' + obj = str(obj) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_datetime(self, obj): + type_ = 'datetime' + obj = datetime_normalize(self.truncate_datetime, obj, default_timezone=self.default_timezone) + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_date(self, obj): + type_ = 'datetime' # yes still datetime but it doesn't need normalization + return KEY_TO_VAL_STR.format(type_, obj) + + def _prep_tuple(self, obj, parent, parents_ids): + # Checking to see if it has _fields. Which probably means it is a named + # tuple. + try: + obj._asdict + # It must be a normal tuple + except AttributeError: + result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) + # We assume it is a namedtuple then + else: + result, counts = self._prep_obj(obj, parent, parents_ids=parents_ids, is_namedtuple=True) + return result, counts + + def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET): + """The main hash method""" + counts = 1 + if self.custom_operators is not None: + for operator in self.custom_operators: + func = getattr(operator, 'normalize_value_for_hashing', None) + if func is None: + raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator)) + else: + obj = func(parent, obj) + + if isinstance(obj, booleanTypes): + obj = self._prep_bool(obj) + result = None + elif self.use_enum_value and isinstance(obj, Enum): + obj = obj.value + else: + result = not_hashed + try: + result, counts = self.hashes[obj] + except (TypeError, KeyError): + pass + else: + return result, counts + + if self._skip_this(obj, parent): + return None, 0 + + elif obj is None: + result = 'NONE' + + elif isinstance(obj, strings): + result = prepare_string_for_hashing( + obj, + ignore_string_type_changes=self.ignore_string_type_changes, + ignore_string_case=self.ignore_string_case, + encodings=self.encodings, + ignore_encoding_errors=self.ignore_encoding_errors, + ) + + elif isinstance(obj, Path): + result = self._prep_path(obj) + + elif isinstance(obj, times): + result = self._prep_datetime(obj) + + elif isinstance(obj, datetime.date): + result = self._prep_date(obj) + + elif isinstance(obj, numbers): # type: ignore + result = self._prep_number(obj) + + elif isinstance(obj, ipranges): + result = self._prep_ipranges(obj) + + elif isinstance(obj, MutableMapping): + result, counts = self._prep_dict(obj=obj, parent=parent, parents_ids=parents_ids) + + elif isinstance(obj, tuple): + result, counts = self._prep_tuple(obj=obj, parent=parent, parents_ids=parents_ids) + + elif (pandas and isinstance(obj, pandas.DataFrame)): # type: ignore + def gen(): # type: ignore + yield ('dtype', obj.dtypes) # type: ignore + yield ('index', obj.index) # type: ignore + yield from obj.items() # type: ignore # which contains (column name, series tuples) + result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) + elif (polars and isinstance(obj, polars.DataFrame)): # type: ignore + def gen(): + yield from obj.columns # type: ignore + yield from list(obj.schema.items()) # type: ignore + yield from obj.rows() # type: ignore + result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids) + + elif isinstance(obj, Iterable): + result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids) + + elif obj == BoolObj.TRUE or obj == BoolObj.FALSE: + result = 'bool:true' if obj is BoolObj.TRUE else 'bool:false' + elif isinstance(obj, PydanticBaseModel): + result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids, is_pydantic_object=True) + else: + result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids) + + if result is not_hashed: # pragma: no cover + self.hashes[UNPROCESSED_KEY].append(obj) + + elif result is unprocessed: + pass + + elif self.apply_hash: + if isinstance(obj, strings): + result_cleaned = result + else: + result_cleaned = prepare_string_for_hashing( + result, ignore_string_type_changes=self.ignore_string_type_changes, + ignore_string_case=self.ignore_string_case) + result = self.hasher(result_cleaned) + + # It is important to keep the hash of all objects. + # The hashes will be later used for comparing the objects. + # Object to hash when possible otherwise ObjectID to hash + try: + self.hashes[obj] = (result, counts) + except TypeError: + obj_id = get_id(obj) + self.hashes[obj_id] = (result, counts) + + return result, counts + + +if __name__ == "__main__": # pragma: no cover + import doctest + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/deepdiff/delta.py b/.venv/lib/python3.12/site-packages/deepdiff/delta.py new file mode 100644 index 00000000..a76593cd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/delta.py @@ -0,0 +1,1217 @@ +import copy +import logging +from typing import List, Dict, IO, Callable, Set, Union, Optional +from functools import partial, cmp_to_key +from collections.abc import Mapping +from copy import deepcopy +from deepdiff import DeepDiff +from deepdiff.serialization import pickle_load, pickle_dump +from deepdiff.helper import ( + strings, numbers, + np_ndarray, np_array_factory, numpy_dtypes, get_doc, + not_found, numpy_dtype_string_to_type, dict_, + Opcode, FlatDeltaRow, UnkownValueCode, FlatDataAction, + OPCODE_TAG_TO_FLAT_DATA_ACTION, + FLAT_DATA_ACTION_TO_OPCODE_TAG, + SetOrdered, +) +from deepdiff.path import ( + _path_to_elements, _get_nested_obj, _get_nested_obj_and_force, + GET, GETATTR, parse_path, stringify_path, +) +from deepdiff.anyset import AnySet +from deepdiff.summarize import summarize + +logger = logging.getLogger(__name__) + + +VERIFICATION_MSG = 'Expected the old value for {} to be {} but it is {}. Error found on: {}. You may want to set force=True, especially if this delta is created by passing flat_rows_list or flat_dict_list' +ELEM_NOT_FOUND_TO_ADD_MSG = 'Key or index of {} is not found for {} for setting operation.' +TYPE_CHANGE_FAIL_MSG = 'Unable to do the type change for {} from to type {} due to {}' +VERIFY_BIDIRECTIONAL_MSG = ('You have applied the delta to an object that has ' + 'different values than the original object the delta was made from.') +FAIL_TO_REMOVE_ITEM_IGNORE_ORDER_MSG = 'Failed to remove index[{}] on {}. It was expected to be {} but got {}' +DELTA_NUMPY_OPERATOR_OVERRIDE_MSG = ( + 'A numpy ndarray is most likely being added to a delta. ' + 'Due to Numpy override the + operator, you can only do: delta + ndarray ' + 'and NOT ndarray + delta') +BINIARY_MODE_NEEDED_MSG = "Please open the file in the binary mode and pass to Delta by passing 'b' in open(..., 'b'): {}" +DELTA_AT_LEAST_ONE_ARG_NEEDED = 'At least one of the diff, delta_path or delta_file arguments need to be passed.' +INVALID_ACTION_WHEN_CALLING_GET_ELEM = 'invalid action of {} when calling _get_elem_and_compare_to_old_value' +INVALID_ACTION_WHEN_CALLING_SIMPLE_SET_ELEM = 'invalid action of {} when calling _simple_set_elem_value' +INVALID_ACTION_WHEN_CALLING_SIMPLE_DELETE_ELEM = 'invalid action of {} when calling _simple_set_elem_value' +UNABLE_TO_GET_ITEM_MSG = 'Unable to get the item at {}: {}' +UNABLE_TO_GET_PATH_MSG = 'Unable to get the item at {}' +INDEXES_NOT_FOUND_WHEN_IGNORE_ORDER = 'Delta added to an incompatible object. Unable to add the following items at the specific indexes. {}' +NUMPY_TO_LIST = 'NUMPY_TO_LIST' +NOT_VALID_NUMPY_TYPE = "{} is not a valid numpy type." + +doc = get_doc('delta.rst') + + +class DeltaError(ValueError): + """ + Delta specific errors + """ + pass + + +class DeltaNumpyOperatorOverrideError(ValueError): + """ + Delta Numpy Operator Override Error + """ + pass + + +class Delta: + + __doc__ = doc + + def __init__( + self, + diff: Union[DeepDiff, Mapping, str, bytes, None]=None, + delta_path: Optional[str]=None, + delta_file: Optional[IO]=None, + delta_diff: Optional[dict]=None, + flat_dict_list: Optional[List[Dict]]=None, + flat_rows_list: Optional[List[FlatDeltaRow]]=None, + deserializer: Callable=pickle_load, + log_errors: bool=True, + mutate: bool=False, + raise_errors: bool=False, + safe_to_import: Optional[Set[str]]=None, + serializer: Callable=pickle_dump, + verify_symmetry: Optional[bool]=None, + bidirectional: bool=False, + always_include_values: bool=False, + iterable_compare_func_was_used: Optional[bool]=None, + force: bool=False, + ): + # for pickle deserializer: + if hasattr(deserializer, '__code__') and 'safe_to_import' in set(deserializer.__code__.co_varnames): + _deserializer = deserializer + else: + def _deserializer(obj, safe_to_import=None): + result = deserializer(obj) + if result.get('_iterable_opcodes'): + _iterable_opcodes = {} + for path, op_codes in result['_iterable_opcodes'].items(): + _iterable_opcodes[path] = [] + for op_code in op_codes: + _iterable_opcodes[path].append( + Opcode( + **op_code + ) + ) + result['_iterable_opcodes'] = _iterable_opcodes + return result + + + self._reversed_diff = None + + if verify_symmetry is not None: + logger.warning( + "DeepDiff Deprecation: use bidirectional instead of verify_symmetry parameter." + ) + bidirectional = verify_symmetry + + self.bidirectional = bidirectional + if bidirectional: + self.always_include_values = True # We need to include the values in bidirectional deltas + else: + self.always_include_values = always_include_values + + if diff is not None: + if isinstance(diff, DeepDiff): + self.diff = diff._to_delta_dict(directed=not bidirectional, always_include_values=self.always_include_values) + elif isinstance(diff, Mapping): + self.diff = diff + elif isinstance(diff, strings): + self.diff = _deserializer(diff, safe_to_import=safe_to_import) + elif delta_path: + with open(delta_path, 'rb') as the_file: + content = the_file.read() + self.diff = _deserializer(content, safe_to_import=safe_to_import) + elif delta_diff: + self.diff = delta_diff + elif delta_file: + try: + content = delta_file.read() + except UnicodeDecodeError as e: + raise ValueError(BINIARY_MODE_NEEDED_MSG.format(e)) from None + self.diff = _deserializer(content, safe_to_import=safe_to_import) + elif flat_dict_list: + # Use copy to preserve original value of flat_dict_list in calling module + self.diff = self._from_flat_dicts(copy.deepcopy(flat_dict_list)) + elif flat_rows_list: + self.diff = self._from_flat_rows(copy.deepcopy(flat_rows_list)) + else: + raise ValueError(DELTA_AT_LEAST_ONE_ARG_NEEDED) + + self.mutate = mutate + self.raise_errors = raise_errors + self.log_errors = log_errors + self._numpy_paths = self.diff.get('_numpy_paths', False) + # When we create the delta from a list of flat dictionaries, details such as iterable_compare_func_was_used get lost. + # That's why we allow iterable_compare_func_was_used to be explicitly set. + self._iterable_compare_func_was_used = self.diff.get('_iterable_compare_func_was_used', iterable_compare_func_was_used) + self.serializer = serializer + self.deserializer = deserializer + self.force = force + if force: + self.get_nested_obj = _get_nested_obj_and_force + else: + self.get_nested_obj = _get_nested_obj + self.reset() + + def __repr__(self): + return "<Delta: {}>".format(summarize(self.diff, max_length=100)) + + def reset(self): + self.post_process_paths_to_convert = dict_() + + def __add__(self, other): + if isinstance(other, numbers) and self._numpy_paths: # type: ignore + raise DeltaNumpyOperatorOverrideError(DELTA_NUMPY_OPERATOR_OVERRIDE_MSG) + if self.mutate: + self.root = other + else: + self.root = deepcopy(other) + self._do_pre_process() + self._do_values_changed() + self._do_set_item_added() + self._do_set_item_removed() + self._do_type_changes() + # NOTE: the remove iterable action needs to happen BEFORE + # all the other iterables to match the reverse of order of operations in DeepDiff + self._do_iterable_opcodes() + self._do_iterable_item_removed() + self._do_iterable_item_added() + self._do_ignore_order() + self._do_dictionary_item_added() + self._do_dictionary_item_removed() + self._do_attribute_added() + self._do_attribute_removed() + self._do_post_process() + + other = self.root + # removing the reference to other + del self.root + self.reset() + return other + + __radd__ = __add__ + + def __rsub__(self, other): + if self._reversed_diff is None: + self._reversed_diff = self._get_reverse_diff() + self.diff, self._reversed_diff = self._reversed_diff, self.diff + result = self.__add__(other) + self.diff, self._reversed_diff = self._reversed_diff, self.diff + return result + + def _raise_or_log(self, msg, level='error'): + if self.log_errors: + getattr(logger, level)(msg) + if self.raise_errors: + raise DeltaError(msg) + + def _do_verify_changes(self, path, expected_old_value, current_old_value): + if self.bidirectional and expected_old_value != current_old_value: + if isinstance(path, str): + path_str = path + else: + path_str = stringify_path(path, root_element=('', GETATTR)) + self._raise_or_log(VERIFICATION_MSG.format( + path_str, expected_old_value, current_old_value, VERIFY_BIDIRECTIONAL_MSG)) + + def _get_elem_and_compare_to_old_value( + self, + obj, + path_for_err_reporting, + expected_old_value, + elem=None, + action=None, + forced_old_value=None, + next_element=None, + ): + # if forced_old_value is not None: + try: + if action == GET: + current_old_value = obj[elem] + elif action == GETATTR: + current_old_value = getattr(obj, elem) # type: ignore + else: + raise DeltaError(INVALID_ACTION_WHEN_CALLING_GET_ELEM.format(action)) + except (KeyError, IndexError, AttributeError, TypeError) as e: + if self.force: + if forced_old_value is None: + if next_element is None or isinstance(next_element, str): + _forced_old_value = {} + else: + _forced_old_value = [] + else: + _forced_old_value = forced_old_value + if action == GET: + if isinstance(obj, list): + if isinstance(elem, int) and elem < len(obj): + obj[elem] = _forced_old_value + else: + obj.append(_forced_old_value) + else: + obj[elem] = _forced_old_value + elif action == GETATTR: + setattr(obj, elem, _forced_old_value) # type: ignore + return _forced_old_value + current_old_value = not_found + if isinstance(path_for_err_reporting, (list, tuple)): + path_for_err_reporting = '.'.join([i[0] for i in path_for_err_reporting]) + if self.bidirectional: + self._raise_or_log(VERIFICATION_MSG.format( + path_for_err_reporting, + expected_old_value, current_old_value, e)) + else: + self._raise_or_log(UNABLE_TO_GET_PATH_MSG.format( + path_for_err_reporting)) + return current_old_value + + def _simple_set_elem_value(self, obj, path_for_err_reporting, elem=None, value=None, action=None): + """ + Set the element value directly on an object + """ + try: + if action == GET: + try: + obj[elem] = value + except IndexError: + if elem == len(obj): + obj.append(value) + else: + self._raise_or_log(ELEM_NOT_FOUND_TO_ADD_MSG.format(elem, path_for_err_reporting)) + elif action == GETATTR: + setattr(obj, elem, value) # type: ignore + else: + raise DeltaError(INVALID_ACTION_WHEN_CALLING_SIMPLE_SET_ELEM.format(action)) + except (KeyError, IndexError, AttributeError, TypeError) as e: + self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e)) + + def _coerce_obj(self, parent, obj, path, parent_to_obj_elem, + parent_to_obj_action, elements, to_type, from_type): + """ + Coerce obj and mark it in post_process_paths_to_convert for later to be converted back. + Also reassign it to its parent to replace the old object. + """ + self.post_process_paths_to_convert[elements[:-1]] = {'old_type': to_type, 'new_type': from_type} + # If this function is going to ever be used to convert numpy arrays, uncomment these lines: + # if from_type is np_ndarray: + # obj = obj.tolist() + # else: + obj = to_type(obj) + + if parent: + # Making sure that the object is re-instated inside the parent especially if it was immutable + # and we had to turn it into a mutable one. In such cases the object has a new id. + self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem, + value=obj, action=parent_to_obj_action) + return obj + + def _set_new_value(self, parent, parent_to_obj_elem, parent_to_obj_action, + obj, elements, path, elem, action, new_value): + """ + Set the element value on an object and if necessary convert the object to the proper mutable type + """ + if isinstance(obj, tuple): + # convert this object back to a tuple later + obj = self._coerce_obj( + parent, obj, path, parent_to_obj_elem, + parent_to_obj_action, elements, + to_type=list, from_type=tuple) + if elem != 0 and self.force and isinstance(obj, list) and len(obj) == 0: + # it must have been a dictionary + obj = {} + self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem, + value=obj, action=parent_to_obj_action) + self._simple_set_elem_value(obj=obj, path_for_err_reporting=path, elem=elem, + value=new_value, action=action) + + def _simple_delete_elem(self, obj, path_for_err_reporting, elem=None, action=None): + """ + Delete the element directly on an object + """ + try: + if action == GET: + del obj[elem] + elif action == GETATTR: + del obj.__dict__[elem] + else: + raise DeltaError(INVALID_ACTION_WHEN_CALLING_SIMPLE_DELETE_ELEM.format(action)) + except (KeyError, IndexError, AttributeError) as e: + self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e)) + + def _del_elem(self, parent, parent_to_obj_elem, parent_to_obj_action, + obj, elements, path, elem, action): + """ + Delete the element value on an object and if necessary convert the object to the proper mutable type + """ + obj_is_new = False + if isinstance(obj, tuple): + # convert this object back to a tuple later + self.post_process_paths_to_convert[elements[:-1]] = {'old_type': list, 'new_type': tuple} + obj = list(obj) + obj_is_new = True + self._simple_delete_elem(obj=obj, path_for_err_reporting=path, elem=elem, action=action) + if obj_is_new and parent: + # Making sure that the object is re-instated inside the parent especially if it was immutable + # and we had to turn it into a mutable one. In such cases the object has a new id. + self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem, + value=obj, action=parent_to_obj_action) + + def _do_iterable_item_added(self): + iterable_item_added = self.diff.get('iterable_item_added', {}) + iterable_item_moved = self.diff.get('iterable_item_moved') + + # First we need to create a placeholder for moved items. + # This will then get replaced below after we go through added items. + # Without this items can get double added because moved store the new_value and does not need item_added replayed + if iterable_item_moved: + added_dict = {v["new_path"]: None for k, v in iterable_item_moved.items()} + iterable_item_added.update(added_dict) + + if iterable_item_added: + self._do_item_added(iterable_item_added, insert=True) + + if iterable_item_moved: + added_dict = {v["new_path"]: v["value"] for k, v in iterable_item_moved.items()} + self._do_item_added(added_dict, insert=False) + + def _do_dictionary_item_added(self): + dictionary_item_added = self.diff.get('dictionary_item_added') + if dictionary_item_added: + self._do_item_added(dictionary_item_added, sort=False) + + def _do_attribute_added(self): + attribute_added = self.diff.get('attribute_added') + if attribute_added: + self._do_item_added(attribute_added) + + @staticmethod + def _sort_key_for_item_added(path_and_value): + elements = _path_to_elements(path_and_value[0]) + # Example elements: [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')] + # We only care about the values in the elements not how to get the values. + return [i[0] for i in elements] + + @staticmethod + def _sort_comparison(left, right): + """ + We use sort comparison instead of _sort_key_for_item_added when we run into comparing element types that can not + be compared with each other, such as None to None. Or integer to string. + """ + # Example elements: [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')] + # We only care about the values in the elements not how to get the values. + left_path = [i[0] for i in _path_to_elements(left[0], root_element=None)] + right_path = [i[0] for i in _path_to_elements(right[0], root_element=None)] + try: + if left_path < right_path: + return -1 + elif left_path > right_path: + return 1 + else: + return 0 + except TypeError: + if len(left_path) > len(right_path): + left_path = left_path[:len(right_path)] + elif len(right_path) > len(left_path): + right_path = right_path[:len(left_path)] + for l_elem, r_elem in zip(left_path, right_path): + if type(l_elem) != type(r_elem) or type(l_elem) in None: + l_elem = str(l_elem) + r_elem = str(r_elem) + try: + if l_elem < r_elem: + return -1 + elif l_elem > r_elem: + return 1 + except TypeError: + continue + return 0 + + + def _do_item_added(self, items, sort=True, insert=False): + if sort: + # sorting items by their path so that the items with smaller index + # are applied first (unless `sort` is `False` so that order of + # added items is retained, e.g. for dicts). + try: + items = sorted(items.items(), key=self._sort_key_for_item_added) + except TypeError: + items = sorted(items.items(), key=cmp_to_key(self._sort_comparison)) + else: + items = items.items() + + for path, new_value in items: + elem_and_details = self._get_elements_and_details(path) + if elem_and_details: + elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details + else: + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + + # Insert is only true for iterables, make sure it is a valid index. + if(insert and elem < len(obj)): # type: ignore + obj.insert(elem, None) # type: ignore + + self._set_new_value(parent, parent_to_obj_elem, parent_to_obj_action, + obj, elements, path, elem, action, new_value) + + def _do_values_changed(self): + values_changed = self.diff.get('values_changed') + if values_changed: + self._do_values_or_type_changed(values_changed) + + def _do_type_changes(self): + type_changes = self.diff.get('type_changes') + if type_changes: + self._do_values_or_type_changed(type_changes, is_type_change=True) + + def _do_post_process(self): + if self.post_process_paths_to_convert: + # Example: We had converted some object to be mutable and now we are converting them back to be immutable. + # We don't need to check the change because it is not really a change that was part of the original diff. + self._do_values_or_type_changed(self.post_process_paths_to_convert, is_type_change=True, verify_changes=False) + + def _do_pre_process(self): + if self._numpy_paths and ('iterable_item_added' in self.diff or 'iterable_item_removed' in self.diff): + preprocess_paths = dict_() + for path, type_ in self._numpy_paths.items(): # type: ignore + preprocess_paths[path] = {'old_type': np_ndarray, 'new_type': list} + try: + type_ = numpy_dtype_string_to_type(type_) + except Exception as e: + self._raise_or_log(NOT_VALID_NUMPY_TYPE.format(e)) + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + self.post_process_paths_to_convert[path] = {'old_type': list, 'new_type': type_} + if preprocess_paths: + self._do_values_or_type_changed(preprocess_paths, is_type_change=True) + + def _get_elements_and_details(self, path): + try: + elements = _path_to_elements(path) + if len(elements) > 1: + elements_subset = elements[:-2] + if len(elements_subset) != len(elements): + next_element = elements[-2][0] + next2_element = elements[-1][0] + else: + next_element = None + parent = self.get_nested_obj(obj=self, elements=elements_subset, next_element=next_element) + parent_to_obj_elem, parent_to_obj_action = elements[-2] + obj = self._get_elem_and_compare_to_old_value( + obj=parent, path_for_err_reporting=path, expected_old_value=None, + elem=parent_to_obj_elem, action=parent_to_obj_action, next_element=next2_element) # type: ignore + else: + # parent = self + # obj = self.root + # parent_to_obj_elem = 'root' + # parent_to_obj_action = GETATTR + parent = parent_to_obj_elem = parent_to_obj_action = None + obj = self + # obj = self.get_nested_obj(obj=self, elements=elements[:-1]) + elem, action = elements[-1] # type: ignore + except Exception as e: + self._raise_or_log(UNABLE_TO_GET_ITEM_MSG.format(path, e)) + return None + else: + if obj is not_found: + return None + return elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action + + def _do_values_or_type_changed(self, changes, is_type_change=False, verify_changes=True): + for path, value in changes.items(): + elem_and_details = self._get_elements_and_details(path) + if elem_and_details: + elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details + else: + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + expected_old_value = value.get('old_value', not_found) + + current_old_value = self._get_elem_and_compare_to_old_value( + obj=obj, path_for_err_reporting=path, expected_old_value=expected_old_value, elem=elem, action=action) + if current_old_value is not_found: + continue # pragma: no cover. I have not been able to write a test for this case. But we should still check for it. + # With type change if we could have originally converted the type from old_value + # to new_value just by applying the class of the new_value, then we might not include the new_value + # in the delta dictionary. That is defined in Model.DeltaResult._from_tree_type_changes + if is_type_change and 'new_value' not in value: + try: + new_type = value['new_type'] + # in case of Numpy we pass the ndarray plus the dtype in a tuple + if new_type in numpy_dtypes: + new_value = np_array_factory(current_old_value, new_type) + else: + new_value = new_type(current_old_value) + except Exception as e: + self._raise_or_log(TYPE_CHANGE_FAIL_MSG.format(obj[elem], value.get('new_type', 'unknown'), e)) # type: ignore + continue + else: + new_value = value['new_value'] + + self._set_new_value(parent, parent_to_obj_elem, parent_to_obj_action, + obj, elements, path, elem, action, new_value) + + if verify_changes: + self._do_verify_changes(path, expected_old_value, current_old_value) + + def _do_item_removed(self, items): + """ + Handle removing items. + """ + # Sorting the iterable_item_removed in reverse order based on the paths. + # So that we delete a bigger index before a smaller index + try: + sorted_item = sorted(items.items(), key=self._sort_key_for_item_added, reverse=True) + except TypeError: + sorted_item = sorted(items.items(), key=cmp_to_key(self._sort_comparison), reverse=True) + for path, expected_old_value in sorted_item: + elem_and_details = self._get_elements_and_details(path) + if elem_and_details: + elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details + else: + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + + look_for_expected_old_value = False + current_old_value = not_found + try: + if action == GET: + current_old_value = obj[elem] # type: ignore + elif action == GETATTR: + current_old_value = getattr(obj, elem) + look_for_expected_old_value = current_old_value != expected_old_value + except (KeyError, IndexError, AttributeError, TypeError): + look_for_expected_old_value = True + + if look_for_expected_old_value and isinstance(obj, list) and not self._iterable_compare_func_was_used: + # It may return None if it doesn't find it + elem = self._find_closest_iterable_element_for_index(obj, elem, expected_old_value) + if elem is not None: + current_old_value = expected_old_value + if current_old_value is not_found or elem is None: + continue + + self._del_elem(parent, parent_to_obj_elem, parent_to_obj_action, + obj, elements, path, elem, action) + self._do_verify_changes(path, expected_old_value, current_old_value) + + def _find_closest_iterable_element_for_index(self, obj, elem, expected_old_value): + closest_elem = None + closest_distance = float('inf') + for index, value in enumerate(obj): + dist = abs(index - elem) + if dist > closest_distance: + break + if value == expected_old_value and dist < closest_distance: + closest_elem = index + closest_distance = dist + return closest_elem + + def _do_iterable_opcodes(self): + _iterable_opcodes = self.diff.get('_iterable_opcodes', {}) + if _iterable_opcodes: + for path, opcodes in _iterable_opcodes.items(): + transformed = [] + # elements = _path_to_elements(path) + elem_and_details = self._get_elements_and_details(path) + if elem_and_details: + elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details + if parent is None: + parent = self + obj = self.root + parent_to_obj_elem = 'root' + parent_to_obj_action = GETATTR + else: + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + # import pytest; pytest.set_trace() + obj = self.get_nested_obj(obj=self, elements=elements) + is_obj_tuple = isinstance(obj, tuple) + for opcode in opcodes: + if opcode.tag == 'replace': + # Replace items in list a[i1:i2] with b[j1:j2] + transformed.extend(opcode.new_values) + elif opcode.tag == 'delete': + # Delete items from list a[i1:i2], so we do nothing here + continue + elif opcode.tag == 'insert': + # Insert items from list b[j1:j2] into the new list + transformed.extend(opcode.new_values) + elif opcode.tag == 'equal': + # Items are the same in both lists, so we add them to the result + transformed.extend(obj[opcode.t1_from_index:opcode.t1_to_index]) # type: ignore + if is_obj_tuple: + obj = tuple(obj) # type: ignore + # Making sure that the object is re-instated inside the parent especially if it was immutable + # and we had to turn it into a mutable one. In such cases the object has a new id. + self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem, + value=obj, action=parent_to_obj_action) + else: + obj[:] = transformed # type: ignore + + + + # obj = self.get_nested_obj(obj=self, elements=elements) + # for + + + def _do_iterable_item_removed(self): + iterable_item_removed = self.diff.get('iterable_item_removed', {}) + + iterable_item_moved = self.diff.get('iterable_item_moved') + if iterable_item_moved: + # These will get added back during items_added + removed_dict = {k: v["value"] for k, v in iterable_item_moved.items()} + iterable_item_removed.update(removed_dict) + + if iterable_item_removed: + self._do_item_removed(iterable_item_removed) + + def _do_dictionary_item_removed(self): + dictionary_item_removed = self.diff.get('dictionary_item_removed') + if dictionary_item_removed: + self._do_item_removed(dictionary_item_removed) + + def _do_attribute_removed(self): + attribute_removed = self.diff.get('attribute_removed') + if attribute_removed: + self._do_item_removed(attribute_removed) + + def _do_set_item_added(self): + items = self.diff.get('set_item_added') + if items: + self._do_set_or_frozenset_item(items, func='union') + + def _do_set_item_removed(self): + items = self.diff.get('set_item_removed') + if items: + self._do_set_or_frozenset_item(items, func='difference') + + def _do_set_or_frozenset_item(self, items, func): + for path, value in items.items(): + elements = _path_to_elements(path) + parent = self.get_nested_obj(obj=self, elements=elements[:-1]) + elem, action = elements[-1] + obj = self._get_elem_and_compare_to_old_value( + parent, path_for_err_reporting=path, expected_old_value=None, elem=elem, action=action, forced_old_value=set()) + new_value = getattr(obj, func)(value) + self._simple_set_elem_value(parent, path_for_err_reporting=path, elem=elem, value=new_value, action=action) + + def _do_ignore_order_get_old(self, obj, remove_indexes_per_path, fixed_indexes_values, path_for_err_reporting): + """ + A generator that gets the old values in an iterable when the order was supposed to be ignored. + """ + old_obj_index = -1 + max_len = len(obj) - 1 + while old_obj_index < max_len: + old_obj_index += 1 + current_old_obj = obj[old_obj_index] + if current_old_obj in fixed_indexes_values: + continue + if old_obj_index in remove_indexes_per_path: + expected_obj_to_delete = remove_indexes_per_path.pop(old_obj_index) + if current_old_obj == expected_obj_to_delete: + continue + else: + self._raise_or_log(FAIL_TO_REMOVE_ITEM_IGNORE_ORDER_MSG.format( + old_obj_index, path_for_err_reporting, expected_obj_to_delete, current_old_obj)) + yield current_old_obj + + def _do_ignore_order(self): + """ + + 't1': [5, 1, 1, 1, 6], + 't2': [7, 1, 1, 1, 8], + + 'iterable_items_added_at_indexes': { + 'root': { + 0: 7, + 4: 8 + } + }, + 'iterable_items_removed_at_indexes': { + 'root': { + 4: 6, + 0: 5 + } + } + + """ + fixed_indexes = self.diff.get('iterable_items_added_at_indexes', dict_()) + remove_indexes = self.diff.get('iterable_items_removed_at_indexes', dict_()) + paths = SetOrdered(fixed_indexes.keys()) | SetOrdered(remove_indexes.keys()) + for path in paths: # type: ignore + # In the case of ignore_order reports, we are pointing to the container object. + # Thus we add a [0] to the elements so we can get the required objects and discard what we don't need. + elem_and_details = self._get_elements_and_details("{}[0]".format(path)) + if elem_and_details: + _, parent, parent_to_obj_elem, parent_to_obj_action, obj, _, _ = elem_and_details + else: + continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198 + # copying both these dictionaries since we don't want to mutate them. + fixed_indexes_per_path = fixed_indexes.get(path, dict_()).copy() + remove_indexes_per_path = remove_indexes.get(path, dict_()).copy() + fixed_indexes_values = AnySet(fixed_indexes_per_path.values()) + + new_obj = [] + # Numpy's NdArray does not like the bool function. + if isinstance(obj, np_ndarray): + there_are_old_items = obj.size > 0 + else: + there_are_old_items = bool(obj) + old_item_gen = self._do_ignore_order_get_old( + obj, remove_indexes_per_path, fixed_indexes_values, path_for_err_reporting=path) + while there_are_old_items or fixed_indexes_per_path: + new_obj_index = len(new_obj) + if new_obj_index in fixed_indexes_per_path: + new_item = fixed_indexes_per_path.pop(new_obj_index) + new_obj.append(new_item) + elif there_are_old_items: + try: + new_item = next(old_item_gen) + except StopIteration: + there_are_old_items = False + else: + new_obj.append(new_item) + else: + # pop a random item from the fixed_indexes_per_path dictionary + self._raise_or_log(INDEXES_NOT_FOUND_WHEN_IGNORE_ORDER.format(fixed_indexes_per_path)) + new_item = fixed_indexes_per_path.pop(next(iter(fixed_indexes_per_path))) + new_obj.append(new_item) + + if isinstance(obj, tuple): + new_obj = tuple(new_obj) + # Making sure that the object is re-instated inside the parent especially if it was immutable + # and we had to turn it into a mutable one. In such cases the object has a new id. + self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem, + value=new_obj, action=parent_to_obj_action) + + def _get_reverse_diff(self): + if not self.bidirectional: + raise ValueError('Please recreate the delta with bidirectional=True') + + SIMPLE_ACTION_TO_REVERSE = { + 'iterable_item_added': 'iterable_item_removed', + 'iterable_items_added_at_indexes': 'iterable_items_removed_at_indexes', + 'attribute_added': 'attribute_removed', + 'set_item_added': 'set_item_removed', + 'dictionary_item_added': 'dictionary_item_removed', + } + # Adding the reverse of the dictionary + for key in list(SIMPLE_ACTION_TO_REVERSE.keys()): + SIMPLE_ACTION_TO_REVERSE[SIMPLE_ACTION_TO_REVERSE[key]] = key + + r_diff = {} + for action, info in self.diff.items(): + reverse_action = SIMPLE_ACTION_TO_REVERSE.get(action) + if reverse_action: + r_diff[reverse_action] = info + elif action == 'values_changed': + r_diff[action] = {} + for path, path_info in info.items(): + reverse_path = path_info['new_path'] if path_info.get('new_path') else path + r_diff[action][reverse_path] = { + 'new_value': path_info['old_value'], 'old_value': path_info['new_value'] + } + elif action == 'type_changes': + r_diff[action] = {} + for path, path_info in info.items(): + reverse_path = path_info['new_path'] if path_info.get('new_path') else path + r_diff[action][reverse_path] = { + 'old_type': path_info['new_type'], 'new_type': path_info['old_type'], + } + if 'new_value' in path_info: + r_diff[action][reverse_path]['old_value'] = path_info['new_value'] + if 'old_value' in path_info: + r_diff[action][reverse_path]['new_value'] = path_info['old_value'] + elif action == 'iterable_item_moved': + r_diff[action] = {} + for path, path_info in info.items(): + old_path = path_info['new_path'] + r_diff[action][old_path] = { + 'new_path': path, 'value': path_info['value'], + } + elif action == '_iterable_opcodes': + r_diff[action] = {} + for path, op_codes in info.items(): + r_diff[action][path] = [] + for op_code in op_codes: + tag = op_code.tag + tag = {'delete': 'insert', 'insert': 'delete'}.get(tag, tag) + new_op_code = Opcode( + tag=tag, + t1_from_index=op_code.t2_from_index, + t1_to_index=op_code.t2_to_index, + t2_from_index=op_code.t1_from_index, + t2_to_index=op_code.t1_to_index, + new_values=op_code.old_values, + old_values=op_code.new_values, + ) + r_diff[action][path].append(new_op_code) + return r_diff + + def dump(self, file): + """ + Dump into file object + """ + # Small optimization: Our internal pickle serializer can just take a file object + # and directly write to it. However if a user defined serializer is passed + # we want to make it compatible with the expectation that self.serializer(self.diff) + # will give the user the serialization and then it can be written to + # a file object when using the dump(file) function. + param_names_of_serializer = set(self.serializer.__code__.co_varnames) + if 'file_obj' in param_names_of_serializer: + self.serializer(self.diff, file_obj=file) + else: + file.write(self.dumps()) + + def dumps(self): + """ + Return the serialized representation of the object as a bytes object, instead of writing it to a file. + """ + return self.serializer(self.diff) + + def to_dict(self): + return dict(self.diff) + + def _flatten_iterable_opcodes(self, _parse_path): + """ + Converts op_codes to FlatDeltaRows + """ + result = [] + for path, op_codes in self.diff['_iterable_opcodes'].items(): + for op_code in op_codes: + result.append( + FlatDeltaRow( + path=_parse_path(path), + action=OPCODE_TAG_TO_FLAT_DATA_ACTION[op_code.tag], + value=op_code.new_values, + old_value=op_code.old_values, + type=type(op_code.new_values), + old_type=type(op_code.old_values), + new_path=None, + t1_from_index=op_code.t1_from_index, + t1_to_index=op_code.t1_to_index, + t2_from_index=op_code.t2_from_index, + t2_to_index=op_code.t2_to_index, + + ) + ) + return result + + @staticmethod + def _get_flat_row(action, info, _parse_path, keys_and_funcs, report_type_changes=True): + for path, details in info.items(): + row = {'path': _parse_path(path), 'action': action} + for key, new_key, func in keys_and_funcs: + if key in details: + if func: + row[new_key] = func(details[key]) + else: + row[new_key] = details[key] + if report_type_changes: + if 'value' in row and 'type' not in row: + row['type'] = type(row['value']) + if 'old_value' in row and 'old_type' not in row: + row['old_type'] = type(row['old_value']) + yield FlatDeltaRow(**row) + + @staticmethod + def _from_flat_rows(flat_rows_list: List[FlatDeltaRow]): + flat_dict_list = (i._asdict() for i in flat_rows_list) + return Delta._from_flat_dicts(flat_dict_list) + + @staticmethod + def _from_flat_dicts(flat_dict_list): + """ + Create the delta's diff object from the flat_dict_list + """ + result = {} + FLATTENING_NEW_ACTION_MAP = { + 'unordered_iterable_item_added': 'iterable_items_added_at_indexes', + 'unordered_iterable_item_removed': 'iterable_items_removed_at_indexes', + } + for flat_dict in flat_dict_list: + index = None + action = flat_dict.get("action") + path = flat_dict.get("path") + value = flat_dict.get('value') + new_path = flat_dict.get('new_path') + old_value = flat_dict.get('old_value', UnkownValueCode) + if not action: + raise ValueError("Flat dict need to include the 'action'.") + if path is None: + raise ValueError("Flat dict need to include the 'path'.") + if action in FLATTENING_NEW_ACTION_MAP: + action = FLATTENING_NEW_ACTION_MAP[action] + index = path.pop() + if action in { + FlatDataAction.attribute_added, + FlatDataAction.attribute_removed, + }: + root_element = ('root', GETATTR) + else: + root_element = ('root', GET) + if isinstance(path, str): + path_str = path + else: + path_str = stringify_path(path, root_element=root_element) # We need the string path + if new_path and new_path != path: + new_path = stringify_path(new_path, root_element=root_element) + else: + new_path = None + if action not in result: + result[action] = {} + if action in { + 'iterable_items_added_at_indexes', + 'iterable_items_removed_at_indexes', + }: + if path_str not in result[action]: + result[action][path_str] = {} + result[action][path_str][index] = value + elif action in { + FlatDataAction.set_item_added, + FlatDataAction.set_item_removed + }: + if path_str not in result[action]: + result[action][path_str] = set() + result[action][path_str].add(value) + elif action in { + FlatDataAction.dictionary_item_added, + FlatDataAction.dictionary_item_removed, + FlatDataAction.attribute_removed, + FlatDataAction.attribute_added, + FlatDataAction.iterable_item_added, + FlatDataAction.iterable_item_removed, + }: + result[action][path_str] = value + elif action == 'values_changed': + if old_value == UnkownValueCode: + result[action][path_str] = {'new_value': value} + else: + result[action][path_str] = {'new_value': value, 'old_value': old_value} + elif action == 'type_changes': + type_ = flat_dict.get('type', UnkownValueCode) + old_type = flat_dict.get('old_type', UnkownValueCode) + + result[action][path_str] = {'new_value': value} + for elem, elem_value in [ + ('new_type', type_), + ('old_type', old_type), + ('old_value', old_value), + ]: + if elem_value != UnkownValueCode: + result[action][path_str][elem] = elem_value + elif action == FlatDataAction.iterable_item_moved: + result[action][path_str] = {'value': value} + elif action in { + FlatDataAction.iterable_items_inserted, + FlatDataAction.iterable_items_deleted, + FlatDataAction.iterable_items_replaced, + FlatDataAction.iterable_items_equal, + }: + if '_iterable_opcodes' not in result: + result['_iterable_opcodes'] = {} + if path_str not in result['_iterable_opcodes']: + result['_iterable_opcodes'][path_str] = [] + result['_iterable_opcodes'][path_str].append( + Opcode( + tag=FLAT_DATA_ACTION_TO_OPCODE_TAG[action], # type: ignore + t1_from_index=flat_dict.get('t1_from_index'), + t1_to_index=flat_dict.get('t1_to_index'), + t2_from_index=flat_dict.get('t2_from_index'), + t2_to_index=flat_dict.get('t2_to_index'), + new_values=flat_dict.get('value'), + old_values=flat_dict.get('old_value'), + ) + ) + if new_path: + result[action][path_str]['new_path'] = new_path + + return result + + def to_flat_dicts(self, include_action_in_path=False, report_type_changes=True) -> List[FlatDeltaRow]: + """ + Returns a flat list of actions that is easily machine readable. + + For example: + {'iterable_item_added': {'root[3]': 5, 'root[2]': 3}} + + Becomes: + [ + {'path': [3], 'value': 5, 'action': 'iterable_item_added'}, + {'path': [2], 'value': 3, 'action': 'iterable_item_added'}, + ] + + + **Parameters** + + include_action_in_path : Boolean, default=False + When False, we translate DeepDiff's paths like root[3].attribute1 into a [3, 'attribute1']. + When True, we include the action to retrieve the item in the path: [(3, 'GET'), ('attribute1', 'GETATTR')] + Note that the "action" here is the different than the action reported by to_flat_dicts. The action here is just about the "path" output. + + report_type_changes : Boolean, default=True + If False, we don't report the type change. Instead we report the value change. + + Example: + t1 = {"a": None} + t2 = {"a": 1} + + dump = Delta(DeepDiff(t1, t2)).dumps() + delta = Delta(dump) + assert t2 == delta + t1 + + flat_result = delta.to_flat_dicts() + flat_expected = [{'path': ['a'], 'action': 'type_changes', 'value': 1, 'new_type': int, 'old_type': type(None)}] + assert flat_expected == flat_result + + flat_result2 = delta.to_flat_dicts(report_type_changes=False) + flat_expected2 = [{'path': ['a'], 'action': 'values_changed', 'value': 1}] + + **List of actions** + + Here are the list of actions that the flat dictionary can return. + iterable_item_added + iterable_item_removed + iterable_item_moved + values_changed + type_changes + set_item_added + set_item_removed + dictionary_item_added + dictionary_item_removed + attribute_added + attribute_removed + """ + return [ + i._asdict() for i in self.to_flat_rows(include_action_in_path=False, report_type_changes=True) + ] # type: ignore + + def to_flat_rows(self, include_action_in_path=False, report_type_changes=True) -> List[FlatDeltaRow]: + """ + Just like to_flat_dicts but returns FlatDeltaRow Named Tuples + """ + result = [] + if include_action_in_path: + _parse_path = partial(parse_path, include_actions=True) + else: + _parse_path = parse_path + if report_type_changes: + keys_and_funcs = [ + ('value', 'value', None), + ('new_value', 'value', None), + ('old_value', 'old_value', None), + ('new_type', 'type', None), + ('old_type', 'old_type', None), + ('new_path', 'new_path', _parse_path), + ] + else: + if not self.always_include_values: + raise ValueError( + "When converting to flat dictionaries, if report_type_changes=False and there are type changes, " + "you must set the always_include_values=True at the delta object creation. Otherwise there is nothing to include." + ) + keys_and_funcs = [ + ('value', 'value', None), + ('new_value', 'value', None), + ('old_value', 'old_value', None), + ('new_path', 'new_path', _parse_path), + ] + + FLATTENING_NEW_ACTION_MAP = { + 'iterable_items_added_at_indexes': 'unordered_iterable_item_added', + 'iterable_items_removed_at_indexes': 'unordered_iterable_item_removed', + } + for action, info in self.diff.items(): + if action == '_iterable_opcodes': + result.extend(self._flatten_iterable_opcodes(_parse_path=_parse_path)) + continue + if action.startswith('_'): + continue + if action in FLATTENING_NEW_ACTION_MAP: + new_action = FLATTENING_NEW_ACTION_MAP[action] + for path, index_to_value in info.items(): + path = _parse_path(path) + for index, value in index_to_value.items(): + path2 = path.copy() + if include_action_in_path: + path2.append((index, 'GET')) # type: ignore + else: + path2.append(index) + if report_type_changes: + row = FlatDeltaRow(path=path2, value=value, action=new_action, type=type(value)) # type: ignore + else: + row = FlatDeltaRow(path=path2, value=value, action=new_action) # type: ignore + result.append(row) + elif action in {'set_item_added', 'set_item_removed'}: + for path, values in info.items(): + path = _parse_path(path) + for value in values: + if report_type_changes: + row = FlatDeltaRow(path=path, value=value, action=action, type=type(value)) + else: + row = FlatDeltaRow(path=path, value=value, action=action) + result.append(row) + elif action == 'dictionary_item_added': + for path, value in info.items(): + path = _parse_path(path) + if isinstance(value, dict) and len(value) == 1: + new_key = next(iter(value)) + path.append(new_key) + value = value[new_key] + elif isinstance(value, (list, tuple)) and len(value) == 1: + value = value[0] + path.append(0) # type: ignore + action = 'iterable_item_added' + elif isinstance(value, set) and len(value) == 1: + value = value.pop() + action = 'set_item_added' + if report_type_changes: + row = FlatDeltaRow(path=path, value=value, action=action, type=type(value)) # type: ignore + else: + row = FlatDeltaRow(path=path, value=value, action=action) # type: ignore + result.append(row) + elif action in { + 'dictionary_item_removed', 'iterable_item_added', + 'iterable_item_removed', 'attribute_removed', 'attribute_added' + }: + for path, value in info.items(): + path = _parse_path(path) + if report_type_changes: + row = FlatDeltaRow(path=path, value=value, action=action, type=type(value)) + else: + row = FlatDeltaRow(path=path, value=value, action=action) + result.append(row) + elif action == 'type_changes': + if not report_type_changes: + action = 'values_changed' + + for row in self._get_flat_row( + action=action, + info=info, + _parse_path=_parse_path, + keys_and_funcs=keys_and_funcs, + report_type_changes=report_type_changes, + ): + result.append(row) + else: + for row in self._get_flat_row( + action=action, + info=info, + _parse_path=_parse_path, + keys_and_funcs=keys_and_funcs, + report_type_changes=report_type_changes, + ): + result.append(row) + return result + + +if __name__ == "__main__": # pragma: no cover + import doctest + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/deepdiff/diff.py b/.venv/lib/python3.12/site-packages/deepdiff/diff.py new file mode 100644 index 00000000..d84ecc7e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/diff.py @@ -0,0 +1,1906 @@ +#!/usr/bin/env python + +# In order to run the docstrings: +# python3 -m deepdiff.diff +# You might need to run it many times since dictionaries come in different orders +# every time you run the docstrings. +# However the docstring expects it in a specific order in order to pass! +import difflib +import logging +import types +import datetime +from enum import Enum +from copy import deepcopy +from math import isclose as is_close +from typing import List, Dict, Callable, Union, Any, Pattern, Tuple, Optional, Set, FrozenSet, TYPE_CHECKING, Protocol +from collections.abc import Mapping, Iterable, Sequence +from collections import defaultdict +from inspect import getmembers +from itertools import zip_longest +from functools import lru_cache +from deepdiff.helper import (strings, bytes_type, numbers, uuids, ListItemRemovedOrAdded, notpresent, + IndexedHash, unprocessed, add_to_frozen_set, basic_types, + convert_item_or_items_into_set_else_none, get_type, + convert_item_or_items_into_compiled_regexes_else_none, + type_is_subclass_of_type_group, type_in_type_group, get_doc, + number_to_string, datetime_normalize, KEY_TO_VAL_STR, booleans, + np_ndarray, np_floating, get_numpy_ndarray_rows, RepeatedTimer, + TEXT_VIEW, TREE_VIEW, DELTA_VIEW, detailed__dict__, add_root_to_paths, + np, get_truncate_datetime, dict_, CannotCompare, ENUM_INCLUDE_KEYS, + PydanticBaseModel, Opcode, SetOrdered, ipranges) +from deepdiff.serialization import SerializationMixin +from deepdiff.distance import DistanceMixin, logarithmic_similarity +from deepdiff.model import ( + RemapDict, ResultDict, TextResult, TreeResult, DiffLevel, + DictRelationship, AttributeRelationship, REPORT_KEYS, + SubscriptableIterableRelationship, NonSubscriptableIterableRelationship, + SetRelationship, NumpyArrayRelationship, CUSTOM_FIELD, + FORCE_DEFAULT, +) +from deepdiff.deephash import DeepHash, combine_hashes_lists +from deepdiff.base import Base +from deepdiff.lfucache import LFUCache, DummyLFU + +if TYPE_CHECKING: + from pytz.tzinfo import BaseTzInfo + + +logger = logging.getLogger(__name__) + +MAX_PASSES_REACHED_MSG = ( + 'DeepDiff has reached the max number of passes of {}. ' + 'You can possibly get more accurate results by increasing the max_passes parameter.') + +MAX_DIFFS_REACHED_MSG = ( + 'DeepDiff has reached the max number of diffs of {}. ' + 'You can possibly get more accurate results by increasing the max_diffs parameter.') + + +notpresent_indexed = IndexedHash(indexes=[0], item=notpresent) + +doc = get_doc('diff_doc.rst') + + +PROGRESS_MSG = "DeepDiff {} seconds in progress. Pass #{}, Diff #{}" + + +def _report_progress(_stats, progress_logger, duration): + """ + Report the progress every few seconds. + """ + progress_logger(PROGRESS_MSG.format(duration, _stats[PASSES_COUNT], _stats[DIFF_COUNT])) + + +DISTANCE_CACHE_HIT_COUNT = 'DISTANCE CACHE HIT COUNT' +DIFF_COUNT = 'DIFF COUNT' +PASSES_COUNT = 'PASSES COUNT' +MAX_PASS_LIMIT_REACHED = 'MAX PASS LIMIT REACHED' +MAX_DIFF_LIMIT_REACHED = 'MAX DIFF LIMIT REACHED' +DISTANCE_CACHE_ENABLED = 'DISTANCE CACHE ENABLED' +PREVIOUS_DIFF_COUNT = 'PREVIOUS DIFF COUNT' +PREVIOUS_DISTANCE_CACHE_HIT_COUNT = 'PREVIOUS DISTANCE CACHE HIT COUNT' +CANT_FIND_NUMPY_MSG = 'Unable to import numpy. This must be a bug in DeepDiff since a numpy array is detected.' +INVALID_VIEW_MSG = 'The only valid values for the view parameter are text and tree. But {} was passed.' +CUTOFF_RANGE_ERROR_MSG = 'cutoff_distance_for_pairs needs to be a positive float max 1.' +VERBOSE_LEVEL_RANGE_MSG = 'verbose_level should be 0, 1, or 2.' +PURGE_LEVEL_RANGE_MSG = 'cache_purge_level should be 0, 1, or 2.' +_ENABLE_CACHE_EVERY_X_DIFF = '_ENABLE_CACHE_EVERY_X_DIFF' + +model_fields_set = frozenset(["model_fields_set"]) + + +# What is the threshold to consider 2 items to be pairs. Only used when ignore_order = True. +CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT = 0.3 + +# What is the threshold to calculate pairs of items between 2 iterables. +# For example 2 iterables that have nothing in common, do not need their pairs to be calculated. +CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT = 0.7 + +DEEPHASH_PARAM_KEYS = ( + 'exclude_types', + 'exclude_paths', + 'include_paths', + 'exclude_regex_paths', + 'hasher', + 'significant_digits', + 'number_format_notation', + 'ignore_string_type_changes', + 'ignore_numeric_type_changes', + 'use_enum_value', + 'ignore_type_in_groups', + 'ignore_type_subclasses', + 'ignore_string_case', + 'exclude_obj_callback', + 'ignore_private_variables', + 'encodings', + 'ignore_encoding_errors', + 'default_timezone', + 'custom_operators', +) + + +class DeepDiffProtocol(Protocol): + t1: Any + t2: Any + cutoff_distance_for_pairs: float + use_log_scale: bool + log_scale_similarity_threshold: float + view: str + + + +class DeepDiff(ResultDict, SerializationMixin, DistanceMixin, DeepDiffProtocol, Base): + __doc__ = doc + + CACHE_AUTO_ADJUST_THRESHOLD = 0.25 + + def __init__(self, + t1: Any, + t2: Any, + _original_type=None, + cache_purge_level: int=1, + cache_size: int=0, + cache_tuning_sample_size: int=0, + custom_operators: Optional[List[Any]] =None, + cutoff_distance_for_pairs: float=CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT, + cutoff_intersection_for_pairs: float=CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT, + default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc, + encodings: Optional[List[str]]=None, + exclude_obj_callback: Optional[Callable]=None, + exclude_obj_callback_strict: Optional[Callable]=None, + exclude_paths: Union[str, List[str], Set[str], FrozenSet[str], None]=None, + exclude_regex_paths: Union[str, List[str], Pattern[str], List[Pattern[str]], None]=None, + exclude_types: Optional[List[Any]]=None, + get_deep_distance: bool=False, + group_by: Union[str, Tuple[str, str], None]=None, + group_by_sort_key: Union[str, Callable, None]=None, + hasher: Optional[Callable]=None, + hashes: Optional[Dict]=None, + ignore_encoding_errors: bool=False, + ignore_nan_inequality: bool=False, + ignore_numeric_type_changes: bool=False, + ignore_order: bool=False, + ignore_order_func: Optional[Callable]=None, + ignore_private_variables: bool=True, + ignore_string_case: bool=False, + ignore_string_type_changes: bool=False, + ignore_type_in_groups: Optional[List[Tuple]]=None, + ignore_type_subclasses: bool=False, + include_obj_callback: Optional[Callable]=None, + include_obj_callback_strict: Optional[Callable]=None, + include_paths: Union[str, List[str], None]=None, + iterable_compare_func: Optional[Callable]=None, + log_frequency_in_sec: int=0, + log_scale_similarity_threshold: float=0.1, + log_stacktrace: bool=False, + math_epsilon: Optional[float]=None, + max_diffs: Optional[int]=None, + max_passes: int=10000000, + number_format_notation: str="f", + number_to_string_func: Optional[Callable]=None, + progress_logger: Callable=logger.info, + report_repetition: bool=False, + significant_digits: Optional[int]=None, + threshold_to_diff_deeper: float = 0.33, + truncate_datetime: Optional[str]=None, + use_enum_value: bool=False, + use_log_scale: bool=False, + verbose_level: int=1, + view: str=TEXT_VIEW, + zip_ordered_iterables: bool=False, + _parameters=None, + _shared_parameters=None, + **kwargs): + super().__init__() + if kwargs: + raise ValueError(( + "The following parameter(s) are not valid: %s\n" + "The valid parameters are ignore_order, report_repetition, significant_digits, " + "number_format_notation, exclude_paths, include_paths, exclude_types, exclude_regex_paths, ignore_type_in_groups, " + "ignore_string_type_changes, ignore_numeric_type_changes, ignore_type_subclasses, truncate_datetime, " + "ignore_private_variables, ignore_nan_inequality, number_to_string_func, verbose_level, " + "view, hasher, hashes, max_passes, max_diffs, zip_ordered_iterables, " + "cutoff_distance_for_pairs, cutoff_intersection_for_pairs, log_frequency_in_sec, cache_size, " + "cache_tuning_sample_size, get_deep_distance, group_by, group_by_sort_key, cache_purge_level, log_stacktrace," + "math_epsilon, iterable_compare_func, use_enum_value, _original_type, threshold_to_diff_deeper, default_timezone " + "ignore_order_func, custom_operators, encodings, ignore_encoding_errors, use_log_scale, log_scale_similarity_threshold " + "_parameters and _shared_parameters.") % ', '.join(kwargs.keys())) + + if _parameters: + self.__dict__.update(_parameters) + else: + self.custom_operators = custom_operators or [] + self.ignore_order = ignore_order + + self.ignore_order_func = ignore_order_func + + ignore_type_in_groups = ignore_type_in_groups or [] + if numbers == ignore_type_in_groups or numbers in ignore_type_in_groups: + ignore_numeric_type_changes = True + self.ignore_numeric_type_changes = ignore_numeric_type_changes + if strings == ignore_type_in_groups or strings in ignore_type_in_groups: + ignore_string_type_changes = True + self.use_enum_value = use_enum_value + self.log_scale_similarity_threshold = log_scale_similarity_threshold + self.use_log_scale = use_log_scale + self.default_timezone = default_timezone + self.log_stacktrace = log_stacktrace + self.threshold_to_diff_deeper = threshold_to_diff_deeper + self.ignore_string_type_changes = ignore_string_type_changes + self.ignore_type_in_groups = self.get_ignore_types_in_groups( + ignore_type_in_groups=ignore_type_in_groups, + ignore_string_type_changes=ignore_string_type_changes, + ignore_numeric_type_changes=ignore_numeric_type_changes, + ignore_type_subclasses=ignore_type_subclasses) + self.report_repetition = report_repetition + self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths)) + self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths)) + self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths) + self.exclude_types = set(exclude_types) if exclude_types else None + self.exclude_types_tuple = tuple(exclude_types) if exclude_types else None # we need tuple for checking isinstance + self.ignore_type_subclasses = ignore_type_subclasses + self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group + self.ignore_string_case = ignore_string_case + self.exclude_obj_callback = exclude_obj_callback + self.exclude_obj_callback_strict = exclude_obj_callback_strict + self.include_obj_callback = include_obj_callback + self.include_obj_callback_strict = include_obj_callback_strict + self.number_to_string = number_to_string_func or number_to_string + self.iterable_compare_func = iterable_compare_func + self.zip_ordered_iterables = zip_ordered_iterables + self.ignore_private_variables = ignore_private_variables + self.ignore_nan_inequality = ignore_nan_inequality + self.hasher = hasher + self.cache_tuning_sample_size = cache_tuning_sample_size + self.group_by = group_by + if callable(group_by_sort_key): + self.group_by_sort_key = group_by_sort_key + elif group_by_sort_key: + def _group_by_sort_key(x): + return x[group_by_sort_key] + self.group_by_sort_key = _group_by_sort_key + else: + self.group_by_sort_key = None + self.encodings = encodings + self.ignore_encoding_errors = ignore_encoding_errors + + self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes) + self.math_epsilon = math_epsilon + if self.math_epsilon is not None and self.ignore_order: + logger.warning("math_epsilon in conjunction with ignore_order=True is only used for flat object comparisons. Custom math_epsilon will not have an effect when comparing nested objects.") + self.truncate_datetime = get_truncate_datetime(truncate_datetime) + self.number_format_notation = number_format_notation + if verbose_level in {0, 1, 2}: + self.verbose_level = verbose_level + else: + raise ValueError(VERBOSE_LEVEL_RANGE_MSG) + if cache_purge_level not in {0, 1, 2}: + raise ValueError(PURGE_LEVEL_RANGE_MSG) + self.view = view + # Setting up the cache for dynamic programming. One dictionary per instance of root of DeepDiff running. + self.max_passes = max_passes + self.max_diffs = max_diffs + self.cutoff_distance_for_pairs = float(cutoff_distance_for_pairs) + self.cutoff_intersection_for_pairs = float(cutoff_intersection_for_pairs) + if self.cutoff_distance_for_pairs < 0 or self.cutoff_distance_for_pairs > 1: + raise ValueError(CUTOFF_RANGE_ERROR_MSG) + # _Parameters are the clean _parameters to initialize DeepDiff with so we avoid all the above + # cleaning functionalities when running DeepDiff recursively. + # However DeepHash has its own set of _parameters that are slightly different than DeepDIff. + # DeepDiff _parameters are transformed to DeepHash _parameters via _get_deephash_params method. + self.progress_logger = progress_logger + self.cache_size = cache_size + _parameters = self.__dict__.copy() + _parameters['group_by'] = None # overwriting since these parameters will be passed on to other passes. + if log_stacktrace: + self.log_err = logger.exception + else: + self.log_err = logger.error + + # Non-Root + if _shared_parameters: + self.is_root = False + self._shared_parameters = _shared_parameters + self.__dict__.update(_shared_parameters) + # We are in some pass other than root + progress_timer = None + # Root + else: + self.is_root = True + # Caching the DeepDiff results for dynamic programming + self._distance_cache = LFUCache(cache_size) if cache_size else DummyLFU() + self._stats = { + PASSES_COUNT: 0, + DIFF_COUNT: 0, + DISTANCE_CACHE_HIT_COUNT: 0, + PREVIOUS_DIFF_COUNT: 0, + PREVIOUS_DISTANCE_CACHE_HIT_COUNT: 0, + MAX_PASS_LIMIT_REACHED: False, + MAX_DIFF_LIMIT_REACHED: False, + DISTANCE_CACHE_ENABLED: bool(cache_size), + } + self.hashes = dict_() if hashes is None else hashes + self._numpy_paths = dict_() # if _numpy_paths is None else _numpy_paths + self._shared_parameters = { + 'hashes': self.hashes, + '_stats': self._stats, + '_distance_cache': self._distance_cache, + '_numpy_paths': self._numpy_paths, + _ENABLE_CACHE_EVERY_X_DIFF: self.cache_tuning_sample_size * 10, + } + if log_frequency_in_sec: + # Creating a progress log reporter that runs in a separate thread every log_frequency_in_sec seconds. + progress_timer = RepeatedTimer(log_frequency_in_sec, _report_progress, self._stats, progress_logger) + else: + progress_timer = None + + self._parameters = _parameters + self.deephash_parameters = self._get_deephash_params() + self.tree = TreeResult() + self._iterable_opcodes = {} + if group_by and self.is_root: + try: + original_t1 = t1 + t1 = self._group_iterable_to_dict(t1, group_by, item_name='t1') + except (KeyError, ValueError): + pass + else: + try: + t2 = self._group_iterable_to_dict(t2, group_by, item_name='t2') + except (KeyError, ValueError): + t1 = original_t1 + + self.t1 = t1 + self.t2 = t2 + + try: + root = DiffLevel(t1, t2, verbose_level=self.verbose_level) + # _original_type is only used to pass the original type of the data. Currently only used for numpy arrays. + # The reason is that we convert the numpy array to python list and then later for distance calculations + # we convert only the the last dimension of it into numpy arrays. + self._diff(root, parents_ids=frozenset({id(t1)}), _original_type=_original_type) + + if get_deep_distance and view in {TEXT_VIEW, TREE_VIEW}: + self.tree['deep_distance'] = self._get_rough_distance() + + self.tree.remove_empty_keys() + view_results = self._get_view_results(self.view) + self.update(view_results) + finally: + if self.is_root: + if cache_purge_level: + del self._distance_cache + del self.hashes + del self._shared_parameters + del self._parameters + for key in (PREVIOUS_DIFF_COUNT, PREVIOUS_DISTANCE_CACHE_HIT_COUNT, + DISTANCE_CACHE_ENABLED): + del self._stats[key] + if progress_timer: + duration = progress_timer.stop() + self._stats['DURATION SEC'] = duration + logger.info('stats {}'.format(self.get_stats())) + if cache_purge_level == 2: + self.__dict__.clear() + + def _get_deephash_params(self): + result = {key: self._parameters[key] for key in DEEPHASH_PARAM_KEYS} + result['ignore_repetition'] = not self.report_repetition + result['number_to_string_func'] = self.number_to_string + return result + + def _report_result(self, report_type, change_level, local_tree=None): + """ + Add a detected change to the reference-style result dictionary. + report_type will be added to level. + (We'll create the text-style report from there later.) + :param report_type: A well defined string key describing the type of change. + Examples: "set_item_added", "values_changed" + :param change_level: A DiffLevel object describing the objects in question in their + before-change and after-change object structure. + + :local_tree: None + """ + + if not self._skip_this(change_level): + change_level.report_type = report_type + tree = self.tree if local_tree is None else local_tree + tree[report_type].add(change_level) + + def custom_report_result(self, report_type, level, extra_info=None): + """ + Add a detected change to the reference-style result dictionary. + report_type will be added to level. + (We'll create the text-style report from there later.) + :param report_type: A well defined string key describing the type of change. + Examples: "set_item_added", "values_changed" + :param parent: A DiffLevel object describing the objects in question in their + before-change and after-change object structure. + :param extra_info: A dict that describe this result + :rtype: None + """ + + if not self._skip_this(level): + level.report_type = report_type + level.additional[CUSTOM_FIELD] = extra_info + self.tree[report_type].add(level) + + @staticmethod + def _dict_from_slots(object): + def unmangle(attribute): + if attribute.startswith('__') and attribute != '__weakref__': + return '_{type}{attribute}'.format( + type=type(object).__name__, + attribute=attribute + ) + return attribute + + all_slots = [] + + if isinstance(object, type): + mro = object.__mro__ # pragma: no cover. I have not been able to write a test for this case. But we still check for it. + else: + mro = object.__class__.__mro__ + + for type_in_mro in mro: + slots = getattr(type_in_mro, '__slots__', None) + if slots: + if isinstance(slots, strings): + all_slots.append(slots) + else: + all_slots.extend(slots) + + return {i: getattr(object, key) for i in all_slots if hasattr(object, key := unmangle(i))} + + def _diff_enum(self, level, parents_ids=frozenset(), local_tree=None): + t1 = detailed__dict__(level.t1, include_keys=ENUM_INCLUDE_KEYS) + t2 = detailed__dict__(level.t2, include_keys=ENUM_INCLUDE_KEYS) + + self._diff_dict( + level, + parents_ids, + print_as_attribute=True, + override=True, + override_t1=t1, + override_t2=t2, + local_tree=local_tree, + ) + + def _diff_obj(self, level, parents_ids=frozenset(), is_namedtuple=False, local_tree=None, is_pydantic_object=False): + """Difference of 2 objects""" + processing_error = False + try: + if is_namedtuple: + t1 = level.t1._asdict() + t2 = level.t2._asdict() + elif is_pydantic_object: + t1 = detailed__dict__(level.t1, ignore_private_variables=self.ignore_private_variables, ignore_keys=model_fields_set) + t2 = detailed__dict__(level.t2, ignore_private_variables=self.ignore_private_variables, ignore_keys=model_fields_set) + elif all('__dict__' in dir(t) for t in level): + t1 = detailed__dict__(level.t1, ignore_private_variables=self.ignore_private_variables) + t2 = detailed__dict__(level.t2, ignore_private_variables=self.ignore_private_variables) + elif all('__slots__' in dir(t) for t in level): + t1 = self._dict_from_slots(level.t1) + t2 = self._dict_from_slots(level.t2) + else: + t1 = {k: v for k, v in getmembers(level.t1) if not callable(v)} + t2 = {k: v for k, v in getmembers(level.t2) if not callable(v)} + except AttributeError: + processing_error = True + if processing_error is True: + self._report_result('unprocessed', level, local_tree=local_tree) + return + + self._diff_dict( + level, + parents_ids, + print_as_attribute=True, + override=True, + override_t1=t1, + override_t2=t2, + local_tree=local_tree, + ) + + def _skip_this(self, level): + """ + Check whether this comparison should be skipped because one of the objects to compare meets exclusion criteria. + :rtype: bool + """ + level_path = level.path() + skip = False + if self.exclude_paths and level_path in self.exclude_paths: + skip = True + if self.include_paths and level_path != 'root': + if level_path not in self.include_paths: + skip = True + for prefix in self.include_paths: + if prefix in level_path or level_path in prefix: + skip = False + break + elif self.exclude_regex_paths and any( + [exclude_regex_path.search(level_path) for exclude_regex_path in self.exclude_regex_paths]): + skip = True + elif self.exclude_types_tuple and \ + (isinstance(level.t1, self.exclude_types_tuple) or isinstance(level.t2, self.exclude_types_tuple)): + skip = True + elif self.exclude_obj_callback and \ + (self.exclude_obj_callback(level.t1, level_path) or self.exclude_obj_callback(level.t2, level_path)): + skip = True + elif self.exclude_obj_callback_strict and \ + (self.exclude_obj_callback_strict(level.t1, level_path) and + self.exclude_obj_callback_strict(level.t2, level_path)): + skip = True + elif self.include_obj_callback and level_path != 'root': + skip = True + if (self.include_obj_callback(level.t1, level_path) or self.include_obj_callback(level.t2, level_path)): + skip = False + elif self.include_obj_callback_strict and level_path != 'root': + skip = True + if (self.include_obj_callback_strict(level.t1, level_path) and + self.include_obj_callback_strict(level.t2, level_path)): + skip = False + + return skip + + def _skip_this_key(self, level, key): + # if include_paths is not set, than treet every path as included + if self.include_paths is None: + return False + if "{}['{}']".format(level.path(), key) in self.include_paths: + return False + if level.path() in self.include_paths: + # matches e.g. level+key root['foo']['bar']['veg'] include_paths ["root['foo']['bar']"] + return False + for prefix in self.include_paths: + if "{}['{}']".format(level.path(), key) in prefix: + # matches as long the prefix is longer than this object key + # eg.: level+key root['foo']['bar'] matches prefix root['foo']['bar'] from include paths + # level+key root['foo'] matches prefix root['foo']['bar'] from include_paths + # level+key root['foo']['bar'] DOES NOT match root['foo'] from include_paths This needs to be handled afterwards + return False + # check if a higher level is included as a whole (=without any sublevels specified) + # matches e.g. level+key root['foo']['bar']['veg'] include_paths ["root['foo']"] + # but does not match, if it is level+key root['foo']['bar']['veg'] include_paths ["root['foo']['bar']['fruits']"] + up = level.up + while up is not None: + if up.path() in self.include_paths: + return False + up = up.up + return True + + def _get_clean_to_keys_mapping(self, keys, level): + """ + Get a dictionary of cleaned value of keys to the keys themselves. + This is mainly used to transform the keys when the type changes of keys should be ignored. + + TODO: needs also some key conversion for groups of types other than the built-in strings and numbers. + """ + result = dict_() + for key in keys: + if self.ignore_string_type_changes and isinstance(key, bytes): + clean_key = key.decode('utf-8') + elif self.use_enum_value and isinstance(key, Enum): + clean_key = key.value + elif isinstance(key, numbers): + type_ = "number" if self.ignore_numeric_type_changes else key.__class__.__name__ + clean_key = self.number_to_string(key, significant_digits=self.significant_digits, + number_format_notation=self.number_format_notation) + clean_key = KEY_TO_VAL_STR.format(type_, clean_key) + else: + clean_key = key + if self.ignore_string_case and isinstance(clean_key, str): + clean_key = clean_key.lower() + if clean_key in result: + logger.warning(('{} and {} in {} become the same key when ignore_numeric_type_changes' + 'or ignore_numeric_type_changes are set to be true.').format( + key, result[clean_key], level.path())) + else: + result[clean_key] = key + return result + + def _diff_dict( + self, + level, + parents_ids=frozenset([]), + print_as_attribute=False, + override=False, + override_t1=None, + override_t2=None, + local_tree=None, + ): + """Difference of 2 dictionaries""" + if override: + # for special stuff like custom objects and named tuples we receive preprocessed t1 and t2 + # but must not spoil the chain (=level) with it + t1 = override_t1 + t2 = override_t2 + else: + t1 = level.t1 + t2 = level.t2 + + if print_as_attribute: + item_added_key = "attribute_added" + item_removed_key = "attribute_removed" + rel_class = AttributeRelationship + else: + item_added_key = "dictionary_item_added" + item_removed_key = "dictionary_item_removed" + rel_class = DictRelationship + + if self.ignore_private_variables: + t1_keys = SetOrdered([key for key in t1 if not(isinstance(key, str) and key.startswith('__')) and not self._skip_this_key(level, key)]) + t2_keys = SetOrdered([key for key in t2 if not(isinstance(key, str) and key.startswith('__')) and not self._skip_this_key(level, key)]) + else: + t1_keys = SetOrdered([key for key in t1 if not self._skip_this_key(level, key)]) + t2_keys = SetOrdered([key for key in t2 if not self._skip_this_key(level, key)]) + if self.ignore_string_type_changes or self.ignore_numeric_type_changes or self.ignore_string_case: + t1_clean_to_keys = self._get_clean_to_keys_mapping(keys=t1_keys, level=level) + t2_clean_to_keys = self._get_clean_to_keys_mapping(keys=t2_keys, level=level) + t1_keys = SetOrdered(t1_clean_to_keys.keys()) + t2_keys = SetOrdered(t2_clean_to_keys.keys()) + else: + t1_clean_to_keys = t2_clean_to_keys = None + + t_keys_intersect = t2_keys & t1_keys + t_keys_added = t2_keys - t_keys_intersect + t_keys_removed = t1_keys - t_keys_intersect + + if self.threshold_to_diff_deeper: + if self.exclude_paths: + t_keys_union = {f"{level.path()}[{repr(key)}]" for key in (t2_keys | t1_keys)} + t_keys_union -= self.exclude_paths + t_keys_union_len = len(t_keys_union) + else: + t_keys_union_len = len(t2_keys | t1_keys) + if t_keys_union_len > 1 and len(t_keys_intersect) / t_keys_union_len < self.threshold_to_diff_deeper: + self._report_result('values_changed', level, local_tree=local_tree) + return + + for key in t_keys_added: + if self._count_diff() is StopIteration: + return + + key = t2_clean_to_keys[key] if t2_clean_to_keys else key + change_level = level.branch_deeper( + notpresent, + t2[key], + child_relationship_class=rel_class, + child_relationship_param=key, + child_relationship_param2=key, + ) + self._report_result(item_added_key, change_level, local_tree=local_tree) + + for key in t_keys_removed: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + + key = t1_clean_to_keys[key] if t1_clean_to_keys else key + change_level = level.branch_deeper( + t1[key], + notpresent, + child_relationship_class=rel_class, + child_relationship_param=key, + child_relationship_param2=key, + ) + self._report_result(item_removed_key, change_level, local_tree=local_tree) + + for key in t_keys_intersect: # key present in both dicts - need to compare values + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + + key1 = t1_clean_to_keys[key] if t1_clean_to_keys else key + key2 = t2_clean_to_keys[key] if t2_clean_to_keys else key + item_id = id(t1[key1]) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + + # Go one level deeper + next_level = level.branch_deeper( + t1[key1], + t2[key2], + child_relationship_class=rel_class, + child_relationship_param=key, + child_relationship_param2=key, + ) + self._diff(next_level, parents_ids_added, local_tree=local_tree) + + def _diff_set(self, level, local_tree=None): + """Difference of sets""" + t1_hashtable = self._create_hashtable(level, 't1') + t2_hashtable = self._create_hashtable(level, 't2') + + t1_hashes = set(t1_hashtable.keys()) + t2_hashes = set(t2_hashtable.keys()) + + hashes_added = t2_hashes - t1_hashes + hashes_removed = t1_hashes - t2_hashes + + items_added = [t2_hashtable[i].item for i in hashes_added] + items_removed = [t1_hashtable[i].item for i in hashes_removed] + + for item in items_added: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + + change_level = level.branch_deeper( + notpresent, item, child_relationship_class=SetRelationship) + self._report_result('set_item_added', change_level, local_tree=local_tree) + + for item in items_removed: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + + change_level = level.branch_deeper( + item, notpresent, child_relationship_class=SetRelationship) + self._report_result('set_item_removed', change_level, local_tree=local_tree) + + @staticmethod + def _iterables_subscriptable(t1, t2): + try: + if getattr(t1, '__getitem__') and getattr(t2, '__getitem__'): + return True + else: # pragma: no cover + return False # should never happen + except AttributeError: + return False + + def _diff_iterable(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None): + """Difference of iterables""" + if (self.ignore_order_func and self.ignore_order_func(level)) or self.ignore_order: + self._diff_iterable_with_deephash(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + else: + self._diff_iterable_in_order(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + + def _compare_in_order( + self, level, + t1_from_index=None, t1_to_index=None, + t2_from_index=None, t2_to_index=None + ) -> List[Tuple[Tuple[int, int], Tuple[Any, Any]]]: + """ + Default compare if `iterable_compare_func` is not provided. + This will compare in sequence order. + """ + if t1_from_index is None: + return [((i, i), (x, y)) for i, (x, y) in enumerate( + zip_longest( + level.t1, level.t2, fillvalue=ListItemRemovedOrAdded))] + else: + t1_chunk = level.t1[t1_from_index:t1_to_index] + t2_chunk = level.t2[t2_from_index:t2_to_index] + return [((i + t1_from_index, i + t2_from_index), (x, y)) for i, (x, y) in enumerate( + zip_longest( + t1_chunk, t2_chunk, fillvalue=ListItemRemovedOrAdded))] + + def _get_matching_pairs( + self, level, + t1_from_index=None, t1_to_index=None, + t2_from_index=None, t2_to_index=None + ) -> List[Tuple[Tuple[int, int], Tuple[Any, Any]]]: + """ + Given a level get matching pairs. This returns list of two tuples in the form: + [ + (t1 index, t2 index), (t1 item, t2 item) + ] + + This will compare using the passed in `iterable_compare_func` if available. + Default it to compare in order + """ + + if self.iterable_compare_func is None: + # Match in order if there is no compare function provided + return self._compare_in_order( + level, + t1_from_index=t1_from_index, t1_to_index=t1_to_index, + t2_from_index=t2_from_index, t2_to_index=t2_to_index, + ) + try: + matches = [] + y_matched = set() + y_index_matched = set() + for i, x in enumerate(level.t1): + x_found = False + for j, y in enumerate(level.t2): + + if(j in y_index_matched): + # This ensures a one-to-one relationship of matches from t1 to t2. + # If y this index in t2 has already been matched to another x + # it cannot have another match, so just continue. + continue + + if(self.iterable_compare_func(x, y, level)): + deep_hash = DeepHash(y, + hashes=self.hashes, + apply_hash=True, + **self.deephash_parameters, + ) + y_index_matched.add(j) + y_matched.add(deep_hash[y]) + matches.append(((i, j), (x, y))) + x_found = True + break + + if(not x_found): + matches.append(((i, -1), (x, ListItemRemovedOrAdded))) + for j, y in enumerate(level.t2): + + deep_hash = DeepHash(y, + hashes=self.hashes, + apply_hash=True, + **self.deephash_parameters, + ) + if(deep_hash[y] not in y_matched): + matches.append(((-1, j), (ListItemRemovedOrAdded, y))) + return matches + except CannotCompare: + return self._compare_in_order( + level, + t1_from_index=t1_from_index, t1_to_index=t1_to_index, + t2_from_index=t2_from_index, t2_to_index=t2_to_index + ) + + def _diff_iterable_in_order(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None): + # We're handling both subscriptable and non-subscriptable iterables. Which one is it? + subscriptable = self._iterables_subscriptable(level.t1, level.t2) + if subscriptable: + child_relationship_class = SubscriptableIterableRelationship + else: + child_relationship_class = NonSubscriptableIterableRelationship + + if ( + not self.zip_ordered_iterables + and isinstance(level.t1, Sequence) + and isinstance(level.t2, Sequence) + and self._all_values_basic_hashable(level.t1) + and self._all_values_basic_hashable(level.t2) + and self.iterable_compare_func is None + ): + local_tree_pass = TreeResult() + opcodes_with_values = self._diff_ordered_iterable_by_difflib( + level, + parents_ids=parents_ids, + _original_type=_original_type, + child_relationship_class=child_relationship_class, + local_tree=local_tree_pass, + ) + # Sometimes DeepDiff's old iterable diff does a better job than DeepDiff + if len(local_tree_pass) > 1: + local_tree_pass2 = TreeResult() + self._diff_by_forming_pairs_and_comparing_one_by_one( + level, + parents_ids=parents_ids, + _original_type=_original_type, + child_relationship_class=child_relationship_class, + local_tree=local_tree_pass2, + ) + if len(local_tree_pass) >= len(local_tree_pass2): + local_tree_pass = local_tree_pass2 + else: + self._iterable_opcodes[level.path(force=FORCE_DEFAULT)] = opcodes_with_values + for report_type, levels in local_tree_pass.items(): + if levels: + self.tree[report_type] |= levels + else: + self._diff_by_forming_pairs_and_comparing_one_by_one( + level, + parents_ids=parents_ids, + _original_type=_original_type, + child_relationship_class=child_relationship_class, + local_tree=local_tree, + ) + + def _all_values_basic_hashable(self, iterable): + """ + Are all items basic hashable types? + Or there are custom types too? + """ + + # We don't want to exhaust a generator + if isinstance(iterable, types.GeneratorType): + return False + for item in iterable: + if not isinstance(item, basic_types): + return False + return True + + def _diff_by_forming_pairs_and_comparing_one_by_one( + self, level, local_tree, parents_ids=frozenset(), + _original_type=None, child_relationship_class=None, + t1_from_index=None, t1_to_index=None, + t2_from_index=None, t2_to_index=None, + ): + for (i, j), (x, y) in self._get_matching_pairs( + level, + t1_from_index=t1_from_index, t1_to_index=t1_to_index, + t2_from_index=t2_from_index, t2_to_index=t2_to_index + ): + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + + reference_param1 = i + reference_param2 = j + if y is ListItemRemovedOrAdded: # item removed completely + change_level = level.branch_deeper( + x, + notpresent, + child_relationship_class=child_relationship_class, + child_relationship_param=reference_param1, + child_relationship_param2=reference_param2, + ) + self._report_result('iterable_item_removed', change_level, local_tree=local_tree) + + elif x is ListItemRemovedOrAdded: # new item added + change_level = level.branch_deeper( + notpresent, + y, + child_relationship_class=child_relationship_class, + child_relationship_param=reference_param1, + child_relationship_param2=reference_param2, + ) + self._report_result('iterable_item_added', change_level, local_tree=local_tree) + + else: # check if item value has changed + if (i != j and ((x == y) or self.iterable_compare_func)): + # Item moved + change_level = level.branch_deeper( + x, + y, + child_relationship_class=child_relationship_class, + child_relationship_param=reference_param1, + child_relationship_param2=reference_param2 + ) + self._report_result('iterable_item_moved', change_level, local_tree=local_tree) + + if self.iterable_compare_func: + # Intentionally setting j as the first child relationship param in cases of a moved item. + # If the item was moved using an iterable_compare_func then we want to make sure that the index + # is relative to t2. + reference_param1 = j + reference_param2 = i + else: + continue + + item_id = id(x) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + + # Go one level deeper + next_level = level.branch_deeper( + x, + y, + child_relationship_class=child_relationship_class, + child_relationship_param=reference_param1, + child_relationship_param2=reference_param2 + ) + self._diff(next_level, parents_ids_added, local_tree=local_tree) + + def _diff_ordered_iterable_by_difflib( + self, level, local_tree, parents_ids=frozenset(), _original_type=None, child_relationship_class=None, + ): + + seq = difflib.SequenceMatcher(isjunk=None, a=level.t1, b=level.t2, autojunk=False) + + opcodes = seq.get_opcodes() + opcodes_with_values = [] + + # TODO: this logic should be revisted so we detect reverse operations + # like when a replacement happens at index X and a reverse replacement happens at index Y + # in those cases we have a "iterable_item_moved" operation. + for tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index in opcodes: + if tag == 'equal': + opcodes_with_values.append(Opcode( + tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index, + )) + continue + # print('{:7} t1[{}:{}] --> t2[{}:{}] {!r:>8} --> {!r}'.format( + # tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index, level.t1[t1_from_index:t1_to_index], level.t2[t2_from_index:t2_to_index])) + + opcodes_with_values.append(Opcode( + tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index, + old_values = level.t1[t1_from_index: t1_to_index], + new_values = level.t2[t2_from_index: t2_to_index], + )) + + if tag == 'replace': + self._diff_by_forming_pairs_and_comparing_one_by_one( + level, local_tree=local_tree, parents_ids=parents_ids, + _original_type=_original_type, child_relationship_class=child_relationship_class, + t1_from_index=t1_from_index, t1_to_index=t1_to_index, + t2_from_index=t2_from_index, t2_to_index=t2_to_index, + ) + elif tag == 'delete': + for index, x in enumerate(level.t1[t1_from_index:t1_to_index]): + change_level = level.branch_deeper( + x, + notpresent, + child_relationship_class=child_relationship_class, + child_relationship_param=index + t1_from_index, + child_relationship_param2=index + t1_from_index, + ) + self._report_result('iterable_item_removed', change_level, local_tree=local_tree) + elif tag == 'insert': + for index, y in enumerate(level.t2[t2_from_index:t2_to_index]): + change_level = level.branch_deeper( + notpresent, + y, + child_relationship_class=child_relationship_class, + child_relationship_param=index + t2_from_index, + child_relationship_param2=index + t2_from_index, + ) + self._report_result('iterable_item_added', change_level, local_tree=local_tree) + return opcodes_with_values + + + def _diff_str(self, level, local_tree=None): + """Compare strings""" + if self.ignore_string_case: + level.t1 = level.t1.lower() + level.t2 = level.t2.lower() + + if type(level.t1) == type(level.t2) and level.t1 == level.t2: # NOQA + return + + # do we add a diff for convenience? + do_diff = True + t1_str = level.t1 + t2_str = level.t2 + + if isinstance(level.t1, bytes_type): + try: + t1_str = level.t1.decode('ascii') + except UnicodeDecodeError: + do_diff = False + + if isinstance(level.t2, bytes_type): + try: + t2_str = level.t2.decode('ascii') + except UnicodeDecodeError: + do_diff = False + + if isinstance(level.t1, Enum): + t1_str = level.t1.value + + if isinstance(level.t2, Enum): + t2_str = level.t2.value + + if t1_str == t2_str: + return + + if do_diff: + if '\n' in t1_str or isinstance(t2_str, str) and '\n' in t2_str: + diff = difflib.unified_diff( + t1_str.splitlines(), t2_str.splitlines(), lineterm='') + diff = list(diff) + if diff: + level.additional['diff'] = '\n'.join(diff) + + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_tuple(self, level, parents_ids, local_tree=None): + # Checking to see if it has _fields. Which probably means it is a named + # tuple. + try: + level.t1._asdict + # It must be a normal tuple + except AttributeError: + self._diff_iterable(level, parents_ids, local_tree=local_tree) + # We assume it is a namedtuple then + else: + self._diff_obj(level, parents_ids, is_namedtuple=True, local_tree=local_tree) + + def _add_hash(self, hashes, item_hash, item, i): + if item_hash in hashes: + hashes[item_hash].indexes.append(i) + else: + hashes[item_hash] = IndexedHash(indexes=[i], item=item) + + def _create_hashtable(self, level, t): + """Create hashtable of {item_hash: (indexes, item)}""" + obj = getattr(level, t) + + local_hashes = dict_() + for (i, item) in enumerate(obj): + try: + parent = "{}[{}]".format(level.path(), i) + # Note: in the DeepDiff we only calculate the hash of items when we have to. + # So self.hashes does not include hashes of all objects in t1 and t2. + # It only includes the ones needed when comparing iterables. + # The self.hashes dictionary gets shared between different runs of DeepHash + # So that any object that is already calculated to have a hash is not re-calculated. + deep_hash = DeepHash( + item, + hashes=self.hashes, + parent=parent, + apply_hash=True, + **self.deephash_parameters, + ) + except UnicodeDecodeError as err: + err.reason = f"Can not produce a hash for {level.path()}: {err.reason}" + raise + except NotImplementedError: + raise + # except Exception as e: # pragma: no cover + # logger.error("Can not produce a hash for %s." + # "Not counting this object.\n %s" % + # (level.path(), e)) + else: + try: + item_hash = deep_hash[item] + except KeyError: + pass + else: + if item_hash is unprocessed: # pragma: no cover + self.log_err("Item %s was not processed while hashing " + "thus not counting this object." % + level.path()) + else: + self._add_hash(hashes=local_hashes, item_hash=item_hash, item=item, i=i) + + # Also we hash the iterables themselves too so that we can later create cache keys from those hashes. + DeepHash( + obj, + hashes=self.hashes, + parent=level.path(), + apply_hash=True, + **self.deephash_parameters, + ) + return local_hashes + + @staticmethod + @lru_cache(maxsize=2028) + def _get_distance_cache_key(added_hash, removed_hash): + key1, key2 = (added_hash, removed_hash) if added_hash > removed_hash else (removed_hash, added_hash) + if isinstance(key1, int): + # If the hash function produces integers we convert them to hex values. + # This was used when the default hash function was Murmur3 128bit which produces integers. + key1 = hex(key1).encode('utf-8') + key2 = hex(key2).encode('utf-8') + elif isinstance(key1, str): + key1 = key1.encode('utf-8') + key2 = key2.encode('utf-8') + return key1 + b'--' + key2 + b'dc' + + def _get_rough_distance_of_hashed_objs( + self, added_hash, removed_hash, added_hash_obj, removed_hash_obj, _original_type=None): + # We need the rough distance between the 2 objects to see if they qualify to be pairs or not + _distance = cache_key = None + if self._stats[DISTANCE_CACHE_ENABLED]: + cache_key = self._get_distance_cache_key(added_hash, removed_hash) + if cache_key in self._distance_cache: + self._stats[DISTANCE_CACHE_HIT_COUNT] += 1 + _distance = self._distance_cache.get(cache_key) + if _distance is None: + # We can only cache the rough distance and not the actual diff result for reuse. + # The reason is that we have modified the parameters explicitly so they are different and can't + # be used for diff reporting + diff = DeepDiff( + removed_hash_obj.item, added_hash_obj.item, + _parameters=self._parameters, + _shared_parameters=self._shared_parameters, + view=DELTA_VIEW, + _original_type=_original_type, + iterable_compare_func=self.iterable_compare_func, + ) + _distance = diff._get_rough_distance() + if cache_key and self._stats[DISTANCE_CACHE_ENABLED]: + self._distance_cache.set(cache_key, value=_distance) + return _distance + + def _get_most_in_common_pairs_in_iterables( + self, hashes_added, hashes_removed, t1_hashtable, t2_hashtable, parents_ids, _original_type): + """ + Get the closest pairs between items that are removed and items that are added. + + returns a dictionary of hashes that are closest to each other. + The dictionary is going to be symmetrical so any key will be a value too and otherwise. + + Note that due to the current reporting structure in DeepDiff, we don't compare an item that + was added to an item that is in both t1 and t2. + + For example + + [{1, 2}, {4, 5, 6}] + [{1, 2}, {1, 2, 3}] + + is only compared between {4, 5, 6} and {1, 2, 3} even though technically {1, 2, 3} is + just one item different than {1, 2} + + Perhaps in future we can have a report key that is item duplicated and modified instead of just added. + """ + cache_key = None + if self._stats[DISTANCE_CACHE_ENABLED]: + cache_key = combine_hashes_lists(items=[hashes_added, hashes_removed], prefix='pairs_cache') + if cache_key in self._distance_cache: + return self._distance_cache.get(cache_key).copy() + + # A dictionary of hashes to distances and each distance to an ordered set of hashes. + # It tells us about the distance of each object from other objects. + # And the objects with the same distances are grouped together in an ordered set. + # It also includes a "max" key that is just the value of the biggest current distance in the + # most_in_common_pairs dictionary. + def defaultdict_orderedset(): + return defaultdict(SetOrdered) + most_in_common_pairs = defaultdict(defaultdict_orderedset) + pairs = dict_() + + pre_calced_distances = None + if hashes_added and hashes_removed and np and len(hashes_added) > 1 and len(hashes_removed) > 1: + # pre-calculates distances ONLY for 1D arrays whether an _original_type + # was explicitly passed or a homogeneous array is detected. + # Numpy is needed for this optimization. + pre_calced_distances = self._precalculate_numpy_arrays_distance( + hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type) + + if hashes_added and hashes_removed \ + and self.iterable_compare_func \ + and len(hashes_added) > 0 and len(hashes_removed) > 0: + pre_calced_distances = self._precalculate_distance_by_custom_compare_func( + hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type) + + for added_hash in hashes_added: + for removed_hash in hashes_removed: + added_hash_obj = t2_hashtable[added_hash] + removed_hash_obj = t1_hashtable[removed_hash] + + # Loop is detected + if id(removed_hash_obj.item) in parents_ids: + continue + + _distance = None + if pre_calced_distances: + _distance = pre_calced_distances.get("{}--{}".format(added_hash, removed_hash)) + if _distance is None: + _distance = self._get_rough_distance_of_hashed_objs( + added_hash, removed_hash, added_hash_obj, removed_hash_obj, _original_type) + # Left for future debugging + # print(f'{Fore.RED}distance of {added_hash_obj.item} and {removed_hash_obj.item}: {_distance}{Style.RESET_ALL}') + # Discard potential pairs that are too far. + if _distance >= self.cutoff_distance_for_pairs: + continue + pairs_of_item = most_in_common_pairs[added_hash] + pairs_of_item[_distance].add(removed_hash) + used_to_hashes = set() + + distances_to_from_hashes = defaultdict(SetOrdered) + for from_hash, distances_to_to_hashes in most_in_common_pairs.items(): + # del distances_to_to_hashes['max'] + for dist in distances_to_to_hashes: + distances_to_from_hashes[dist].add(from_hash) + + for dist in sorted(distances_to_from_hashes.keys()): + from_hashes = distances_to_from_hashes[dist] + while from_hashes: + from_hash = from_hashes.pop() + if from_hash not in used_to_hashes: + to_hashes = most_in_common_pairs[from_hash][dist] + while to_hashes: + to_hash = to_hashes.pop() + if to_hash not in used_to_hashes: + used_to_hashes.add(from_hash) + used_to_hashes.add(to_hash) + # Left for future debugging: + # print(f'{bcolors.FAIL}Adding {t2_hashtable[from_hash].item} as a pairs of {t1_hashtable[to_hash].item} with distance of {dist}{bcolors.ENDC}') + pairs[from_hash] = to_hash + + inverse_pairs = {v: k for k, v in pairs.items()} + pairs.update(inverse_pairs) + if cache_key and self._stats[DISTANCE_CACHE_ENABLED]: + self._distance_cache.set(cache_key, value=pairs) + return pairs.copy() + + def _diff_iterable_with_deephash(self, level, parents_ids, _original_type=None, local_tree=None): + """Diff of hashable or unhashable iterables. Only used when ignoring the order.""" + + full_t1_hashtable = self._create_hashtable(level, 't1') + full_t2_hashtable = self._create_hashtable(level, 't2') + t1_hashes = SetOrdered(full_t1_hashtable.keys()) + t2_hashes = SetOrdered(full_t2_hashtable.keys()) + hashes_added = t2_hashes - t1_hashes + hashes_removed = t1_hashes - t2_hashes + + # Deciding whether to calculate pairs or not. + if (len(hashes_added) + len(hashes_removed)) / (len(full_t1_hashtable) + len(full_t2_hashtable) + 1) > self.cutoff_intersection_for_pairs: + get_pairs = False + else: + get_pairs = True + + # reduce the size of hashtables + if self.report_repetition: + t1_hashtable = full_t1_hashtable + t2_hashtable = full_t2_hashtable + else: + t1_hashtable = {k: v for k, v in full_t1_hashtable.items() if k in hashes_removed} + t2_hashtable = {k: v for k, v in full_t2_hashtable.items() if k in hashes_added} + if self._stats[PASSES_COUNT] < self.max_passes and get_pairs: + self._stats[PASSES_COUNT] += 1 + pairs = self._get_most_in_common_pairs_in_iterables( + hashes_added, hashes_removed, t1_hashtable, t2_hashtable, parents_ids, _original_type) + elif get_pairs: + if not self._stats[MAX_PASS_LIMIT_REACHED]: + self._stats[MAX_PASS_LIMIT_REACHED] = True + logger.warning(MAX_PASSES_REACHED_MSG.format(self.max_passes)) + pairs = dict_() + else: + pairs = dict_() + + def get_other_pair(hash_value, in_t1=True): + """ + Gets the other paired indexed hash item to the hash_value in the pairs dictionary + in_t1: are we looking for the other pair in t1 or t2? + """ + if in_t1: + hashtable = t1_hashtable + the_other_hashes = hashes_removed + else: + hashtable = t2_hashtable + the_other_hashes = hashes_added + other = pairs.pop(hash_value, notpresent) + if other is notpresent: + other = notpresent_indexed + else: + # The pairs are symmetrical. + # removing the other direction of pair + # so it does not get used. + del pairs[other] + the_other_hashes.remove(other) + other = hashtable[other] + return other + + if self.report_repetition: + for hash_value in hashes_added: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition (when report_repetition=False). + other = get_other_pair(hash_value) + item_id = id(other.item) + indexes = t2_hashtable[hash_value].indexes if other.item is notpresent else other.indexes + # When we report repetitions, we want the child_relationship_param2 only if there is no repetition. + # Because when there is a repetition, we report it in a different way (iterable_items_added_at_indexes for example). + # When there is no repetition, we want child_relationship_param2 so that we report the "new_path" correctly. + if len(t2_hashtable[hash_value].indexes) == 1: + index2 = t2_hashtable[hash_value].indexes[0] + else: + index2 = None + for i in indexes: + change_level = level.branch_deeper( + other.item, + t2_hashtable[hash_value].item, + child_relationship_class=SubscriptableIterableRelationship, + child_relationship_param=i, + child_relationship_param2=index2, + ) + if other.item is notpresent: + self._report_result('iterable_item_added', change_level, local_tree=local_tree) + else: + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + self._diff(change_level, parents_ids_added, local_tree=local_tree) + for hash_value in hashes_removed: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + other = get_other_pair(hash_value, in_t1=False) + item_id = id(other.item) + # When we report repetitions, we want the child_relationship_param2 only if there is no repetition. + # Because when there is a repetition, we report it in a different way (iterable_items_added_at_indexes for example). + # When there is no repetition, we want child_relationship_param2 so that we report the "new_path" correctly. + if other.item is notpresent or len(other.indexes > 1): + index2 = None + else: + index2 = other.indexes[0] + for i in t1_hashtable[hash_value].indexes: + change_level = level.branch_deeper( + t1_hashtable[hash_value].item, + other.item, + child_relationship_class=SubscriptableIterableRelationship, + child_relationship_param=i, + child_relationship_param2=index2, + ) + if other.item is notpresent: + self._report_result('iterable_item_removed', change_level, local_tree=local_tree) + else: + # I was not able to make a test case for the following 2 lines since the cases end up + # getting resolved above in the hashes_added calcs. However I am leaving these 2 lines + # in case things change in future. + parents_ids_added = add_to_frozen_set(parents_ids, item_id) # pragma: no cover. + self._diff(change_level, parents_ids_added, local_tree=local_tree) # pragma: no cover. + + items_intersect = t2_hashes.intersection(t1_hashes) + + for hash_value in items_intersect: + t1_indexes = t1_hashtable[hash_value].indexes + t2_indexes = t2_hashtable[hash_value].indexes + t1_indexes_len = len(t1_indexes) + t2_indexes_len = len(t2_indexes) + if t1_indexes_len != t2_indexes_len: # this is a repetition change! + # create "change" entry, keep current level untouched to handle further changes + repetition_change_level = level.branch_deeper( + t1_hashtable[hash_value].item, + t2_hashtable[hash_value].item, # nb: those are equal! + child_relationship_class=SubscriptableIterableRelationship, + child_relationship_param=t1_hashtable[hash_value] + .indexes[0]) + repetition_change_level.additional['repetition'] = RemapDict( + old_repeat=t1_indexes_len, + new_repeat=t2_indexes_len, + old_indexes=t1_indexes, + new_indexes=t2_indexes) + self._report_result('repetition_change', + repetition_change_level, local_tree=local_tree) + + else: + for hash_value in hashes_added: + if self._count_diff() is StopIteration: + return + other = get_other_pair(hash_value) + item_id = id(other.item) + index = t2_hashtable[hash_value].indexes[0] if other.item is notpresent else other.indexes[0] + index2 = t2_hashtable[hash_value].indexes[0] + change_level = level.branch_deeper( + other.item, + t2_hashtable[hash_value].item, + child_relationship_class=SubscriptableIterableRelationship, + child_relationship_param=index, + child_relationship_param2=index2, + ) + if other.item is notpresent: + self._report_result('iterable_item_added', change_level, local_tree=local_tree) + else: + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + self._diff(change_level, parents_ids_added, local_tree=local_tree) + + for hash_value in hashes_removed: + if self._count_diff() is StopIteration: + return # pragma: no cover. This is already covered for addition. + other = get_other_pair(hash_value, in_t1=False) + item_id = id(other.item) + index = t1_hashtable[hash_value].indexes[0] + index2 = t1_hashtable[hash_value].indexes[0] if other.item is notpresent else other.indexes[0] + change_level = level.branch_deeper( + t1_hashtable[hash_value].item, + other.item, + child_relationship_class=SubscriptableIterableRelationship, + child_relationship_param=index, + child_relationship_param2=index2, + ) + if other.item is notpresent: + self._report_result('iterable_item_removed', change_level, local_tree=local_tree) + else: + # Just like the case when report_repetition = True, these lines never run currently. + # However they will stay here in case things change in future. + parents_ids_added = add_to_frozen_set(parents_ids, item_id) # pragma: no cover. + self._diff(change_level, parents_ids_added, local_tree=local_tree) # pragma: no cover. + + def _diff_booleans(self, level, local_tree=None): + if level.t1 != level.t2: + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_numbers(self, level, local_tree=None, report_type_change=True): + """Diff Numbers""" + if report_type_change: + t1_type = "number" if self.ignore_numeric_type_changes else level.t1.__class__.__name__ + t2_type = "number" if self.ignore_numeric_type_changes else level.t2.__class__.__name__ + else: + t1_type = t2_type = '' + + if self.use_log_scale: + if not logarithmic_similarity(level.t1, level.t2, threshold=self.log_scale_similarity_threshold): + self._report_result('values_changed', level, local_tree=local_tree) + elif self.math_epsilon is not None: + if not is_close(level.t1, level.t2, abs_tol=self.math_epsilon): + self._report_result('values_changed', level, local_tree=local_tree) + elif self.significant_digits is None: + if level.t1 != level.t2: + self._report_result('values_changed', level, local_tree=local_tree) + else: + # Bernhard10: I use string formatting for comparison, to be consistent with usecases where + # data is read from files that were previously written from python and + # to be consistent with on-screen representation of numbers. + # Other options would be abs(t1-t2)<10**-self.significant_digits + # or math.is_close (python3.5+) + # Note that abs(3.25-3.251) = 0.0009999999999998899 < 0.001 + # Note also that "{:.3f}".format(1.1135) = 1.113, but "{:.3f}".format(1.11351) = 1.114 + # For Decimals, format seems to round 2.5 to 2 and 3.5 to 4 (to closest even number) + t1_s = self.number_to_string(level.t1, + significant_digits=self.significant_digits, + number_format_notation=self.number_format_notation) + t2_s = self.number_to_string(level.t2, + significant_digits=self.significant_digits, + number_format_notation=self.number_format_notation) + + t1_s = KEY_TO_VAL_STR.format(t1_type, t1_s) + t2_s = KEY_TO_VAL_STR.format(t2_type, t2_s) + if t1_s != t2_s: + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_ipranges(self, level, local_tree=None): + """Diff IP ranges""" + if str(level.t1) != str(level.t2): + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_datetime(self, level, local_tree=None): + """Diff DateTimes""" + level.t1 = datetime_normalize(self.truncate_datetime, level.t1, default_timezone=self.default_timezone) + level.t2 = datetime_normalize(self.truncate_datetime, level.t2, default_timezone=self.default_timezone) + + if level.t1 != level.t2: + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_time(self, level, local_tree=None): + """Diff DateTimes""" + if self.truncate_datetime: + level.t1 = datetime_normalize(self.truncate_datetime, level.t1, default_timezone=self.default_timezone) + level.t2 = datetime_normalize(self.truncate_datetime, level.t2, default_timezone=self.default_timezone) + + if level.t1 != level.t2: + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_uuids(self, level, local_tree=None): + """Diff UUIDs""" + if level.t1.int != level.t2.int: + self._report_result('values_changed', level, local_tree=local_tree) + + def _diff_numpy_array(self, level, parents_ids=frozenset(), local_tree=None): + """Diff numpy arrays""" + if level.path() not in self._numpy_paths: + self._numpy_paths[level.path()] = get_type(level.t2).__name__ + if np is None: + # This line should never be run. If it is ever called means the type check detected a numpy array + # which means numpy module needs to be available. So np can't be None. + raise ImportError(CANT_FIND_NUMPY_MSG) # pragma: no cover + + if (self.ignore_order_func and not self.ignore_order_func(level)) or not self.ignore_order: + # fast checks + if self.significant_digits is None: + if np.array_equal(level.t1, level.t2, equal_nan=self.ignore_nan_inequality): + return # all good + else: + try: + np.testing.assert_almost_equal(level.t1, level.t2, decimal=self.significant_digits) + except TypeError: + np.array_equal(level.t1, level.t2, equal_nan=self.ignore_nan_inequality) + except AssertionError: + pass # do detailed checking below + else: + return # all good + + # compare array meta-data + _original_type = level.t1.dtype + if level.t1.shape != level.t2.shape: + # arrays are converted to python lists so that certain features of DeepDiff can apply on them easier. + # They will be converted back to Numpy at their final dimension. + level.t1 = level.t1.tolist() + level.t2 = level.t2.tolist() + self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + else: + # metadata same -- the difference is in the content + shape = level.t1.shape + dimensions = len(shape) + if dimensions == 1: + self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + elif (self.ignore_order_func and self.ignore_order_func(level)) or self.ignore_order: + # arrays are converted to python lists so that certain features of DeepDiff can apply on them easier. + # They will be converted back to Numpy at their final dimension. + level.t1 = level.t1.tolist() + level.t2 = level.t2.tolist() + self._diff_iterable_with_deephash(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + else: + for (t1_path, t1_row), (t2_path, t2_row) in zip( + get_numpy_ndarray_rows(level.t1, shape), + get_numpy_ndarray_rows(level.t2, shape)): + + new_level = level.branch_deeper( + t1_row, + t2_row, + child_relationship_class=NumpyArrayRelationship, + child_relationship_param=t1_path, + child_relationship_param2=t2_path, + ) + + self._diff_iterable_in_order(new_level, parents_ids, _original_type=_original_type, local_tree=local_tree) + + def _diff_types(self, level, local_tree=None): + """Diff types""" + level.report_type = 'type_changes' + self._report_result('type_changes', level, local_tree=local_tree) + + def _count_diff(self): + if (self.max_diffs is not None and self._stats[DIFF_COUNT] > self.max_diffs): + if not self._stats[MAX_DIFF_LIMIT_REACHED]: + self._stats[MAX_DIFF_LIMIT_REACHED] = True + logger.warning(MAX_DIFFS_REACHED_MSG.format(self.max_diffs)) + return StopIteration + self._stats[DIFF_COUNT] += 1 + if self.cache_size and self.cache_tuning_sample_size: + self._auto_tune_cache() + + def _auto_tune_cache(self): + take_sample = (self._stats[DIFF_COUNT] % self.cache_tuning_sample_size == 0) + if self.cache_tuning_sample_size: + if self._stats[DISTANCE_CACHE_ENABLED]: + if take_sample: + self._auto_off_cache() + # Turn on the cache once in a while + elif self._stats[DIFF_COUNT] % self._shared_parameters[_ENABLE_CACHE_EVERY_X_DIFF] == 0: + self.progress_logger('Re-enabling the distance and level caches.') + # decreasing the sampling frequency + self._shared_parameters[_ENABLE_CACHE_EVERY_X_DIFF] *= 10 + self._stats[DISTANCE_CACHE_ENABLED] = True + if take_sample: + for key in (PREVIOUS_DIFF_COUNT, PREVIOUS_DISTANCE_CACHE_HIT_COUNT): + self._stats[key] = self._stats[key[9:]] + + def _auto_off_cache(self): + """ + Auto adjust the cache based on the usage + """ + if self._stats[DISTANCE_CACHE_ENABLED]: + angle = (self._stats[DISTANCE_CACHE_HIT_COUNT] - self._stats['PREVIOUS {}'.format(DISTANCE_CACHE_HIT_COUNT)]) / (self._stats[DIFF_COUNT] - self._stats[PREVIOUS_DIFF_COUNT]) + if angle < self.CACHE_AUTO_ADJUST_THRESHOLD: + self._stats[DISTANCE_CACHE_ENABLED] = False + self.progress_logger('Due to minimal cache hits, {} is disabled.'.format('distance cache')) + + def _use_custom_operator(self, level): + """ + For each level we check all custom operators. + If any one of them was a match for the level, we run the diff of the operator. + If the operator returned True, the operator must have decided these objects should not + be compared anymore. It might have already reported their results. + In that case the report will appear in the final results of this diff. + Otherwise basically the 2 objects in the level are being omitted from the results. + """ + + for operator in self.custom_operators: + if operator.match(level): + prevent_default = operator.give_up_diffing(level=level, diff_instance=self) + if prevent_default: + return True + + return False + + def _diff(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None): + """ + The main diff method + + **parameters** + + level: the tree level or tree node + parents_ids: the ids of all the parent objects in the tree from the current node. + _original_type: If the objects had an original type that was different than what currently exists in the level.t1 and t2 + """ + if self._count_diff() is StopIteration: + return + + if self._use_custom_operator(level): + return + + if level.t1 is level.t2: + return + + if self._skip_this(level): + return + + report_type_change = True + if get_type(level.t1) != get_type(level.t2): + for type_group in self.ignore_type_in_groups: + if self.type_check_func(level.t1, type_group) and self.type_check_func(level.t2, type_group): + report_type_change = False + break + if self.use_enum_value and isinstance(level.t1, Enum): + level.t1 = level.t1.value + report_type_change = False + if self.use_enum_value and isinstance(level.t2, Enum): + level.t2 = level.t2.value + report_type_change = False + if report_type_change: + self._diff_types(level, local_tree=local_tree) + return + # This is an edge case where t1=None or t2=None and None is in the ignore type group. + if level.t1 is None or level.t2 is None: + self._report_result('values_changed', level, local_tree=local_tree) + return + + if self.ignore_nan_inequality and isinstance(level.t1, (float, np_floating)) and str(level.t1) == str(level.t2) == 'nan': + return + + if isinstance(level.t1, booleans): + self._diff_booleans(level, local_tree=local_tree) + + elif isinstance(level.t1, strings): + self._diff_str(level, local_tree=local_tree) + + elif isinstance(level.t1, datetime.datetime): + self._diff_datetime(level, local_tree=local_tree) + + elif isinstance(level.t1, ipranges): + self._diff_ipranges(level, local_tree=local_tree) + + elif isinstance(level.t1, (datetime.date, datetime.timedelta, datetime.time)): + self._diff_time(level, local_tree=local_tree) + + elif isinstance(level.t1, uuids): + self._diff_uuids(level, local_tree=local_tree) + + elif isinstance(level.t1, numbers): + self._diff_numbers(level, local_tree=local_tree, report_type_change=report_type_change) + + elif isinstance(level.t1, Mapping): + self._diff_dict(level, parents_ids, local_tree=local_tree) + + elif isinstance(level.t1, tuple): + self._diff_tuple(level, parents_ids, local_tree=local_tree) + + elif isinstance(level.t1, (set, frozenset, SetOrdered)): + self._diff_set(level, local_tree=local_tree) + + elif isinstance(level.t1, np_ndarray): + self._diff_numpy_array(level, parents_ids, local_tree=local_tree) + + elif isinstance(level.t1, PydanticBaseModel): + self._diff_obj(level, parents_ids, local_tree=local_tree, is_pydantic_object=True) + + elif isinstance(level.t1, Iterable): + self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree) + + elif isinstance(level.t1, Enum): + self._diff_enum(level, parents_ids, local_tree=local_tree) + + else: + self._diff_obj(level, parents_ids) + + def _get_view_results(self, view): + """ + Get the results based on the view + """ + result = self.tree + if not self.report_repetition: # and self.is_root: + result.mutual_add_removes_to_become_value_changes() + if view == TREE_VIEW: + pass + elif view == TEXT_VIEW: + result = TextResult(tree_results=self.tree, verbose_level=self.verbose_level) + result.remove_empty_keys() + elif view == DELTA_VIEW: + result = self._to_delta_dict(report_repetition_required=False) + else: + raise ValueError(INVALID_VIEW_MSG.format(view)) + return result + + @staticmethod + def _get_key_for_group_by(row, group_by, item_name): + try: + return row.pop(group_by) + except KeyError: + logger.error("Unable to group {} by {}. The key is missing in {}".format(item_name, group_by, row)) + raise + + def _group_iterable_to_dict(self, item, group_by, item_name): + """ + Convert a list of dictionaries into a dictionary of dictionaries + where the key is the value of the group_by key in each dictionary. + """ + group_by_level2 = None + if isinstance(group_by, (list, tuple)): + group_by_level1 = group_by[0] + if len(group_by) > 1: + group_by_level2 = group_by[1] + else: + group_by_level1 = group_by + if isinstance(item, Iterable) and not isinstance(item, Mapping): + result = {} + item_copy = deepcopy(item) + for row in item_copy: + if isinstance(row, Mapping): + key1 = self._get_key_for_group_by(row, group_by_level1, item_name) + if group_by_level2: + key2 = self._get_key_for_group_by(row, group_by_level2, item_name) + if key1 not in result: + result[key1] = {} + if self.group_by_sort_key: + if key2 not in result[key1]: + result[key1][key2] = [] + result_key1_key2 = result[key1][key2] + if row not in result_key1_key2: + result_key1_key2.append(row) + else: + result[key1][key2] = row + else: + if self.group_by_sort_key: + if key1 not in result: + result[key1] = [] + if row not in result[key1]: + result[key1].append(row) + else: + result[key1] = row + else: + msg = "Unable to group {} by {} since the item {} is not a dictionary.".format(item_name, group_by_level1, row) + logger.error(msg) + raise ValueError(msg) + if self.group_by_sort_key: + if group_by_level2: + for key1, row1 in result.items(): + for key2, row in row1.items(): + row.sort(key=self.group_by_sort_key) + else: + for key, row in result.items(): + row.sort(key=self.group_by_sort_key) + return result + msg = "Unable to group {} by {}".format(item_name, group_by) + logger.error(msg) + raise ValueError(msg) + + def get_stats(self): + """ + Get some stats on internals of the DeepDiff run. + """ + return self._stats + + @property + def affected_paths(self): + """ + Get the list of paths that were affected. + Whether a value was changed or they were added or removed. + + Example + >>> t1 = {1: 1, 2: 2, 3: [3], 4: 4} + >>> t2 = {1: 1, 2: 4, 3: [3, 4], 5: 5, 6: 6} + >>> ddiff = DeepDiff(t1, t2) + >>> ddiff + >>> pprint(ddiff, indent=4) + { 'dictionary_item_added': [root[5], root[6]], + 'dictionary_item_removed': [root[4]], + 'iterable_item_added': {'root[3][1]': 4}, + 'values_changed': {'root[2]': {'new_value': 4, 'old_value': 2}}} + >>> ddiff.affected_paths + SetOrdered(['root[3][1]', 'root[4]', 'root[5]', 'root[6]', 'root[2]']) + >>> ddiff.affected_root_keys + SetOrdered([3, 4, 5, 6, 2]) + + """ + result = SetOrdered() + for key in REPORT_KEYS: + value = self.get(key) + if value: + if isinstance(value, SetOrdered): + result |= value + else: + result |= SetOrdered(value.keys()) + return result + + @property + def affected_root_keys(self): + """ + Get the list of root keys that were affected. + Whether a value was changed or they were added or removed. + + Example + >>> t1 = {1: 1, 2: 2, 3: [3], 4: 4} + >>> t2 = {1: 1, 2: 4, 3: [3, 4], 5: 5, 6: 6} + >>> ddiff = DeepDiff(t1, t2) + >>> ddiff + >>> pprint(ddiff, indent=4) + { 'dictionary_item_added': [root[5], root[6]], + 'dictionary_item_removed': [root[4]], + 'iterable_item_added': {'root[3][1]': 4}, + 'values_changed': {'root[2]': {'new_value': 4, 'old_value': 2}}} + >>> ddiff.affected_paths + SetOrdered(['root[3][1]', 'root[4]', 'root[5]', 'root[6]', 'root[2]']) + >>> ddiff.affected_root_keys + SetOrdered([3, 4, 5, 6, 2]) + """ + result = SetOrdered() + for key in REPORT_KEYS: + value = self.tree.get(key) + if value: + if isinstance(value, SetOrdered): + values_list = value + else: + values_list = value.keys() + for item in values_list: + root_key = item.get_root_key() + if root_key is not notpresent: + result.add(root_key) + return result + + +if __name__ == "__main__": # pragma: no cover + import doctest + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/deepdiff/distance.py b/.venv/lib/python3.12/site-packages/deepdiff/distance.py new file mode 100644 index 00000000..adaf5045 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/distance.py @@ -0,0 +1,342 @@ +import math +import datetime +from typing import TYPE_CHECKING, Callable, Protocol, Any +from deepdiff.deephash import DeepHash +from deepdiff.helper import ( + DELTA_VIEW, numbers, strings, add_to_frozen_set, not_found, only_numbers, np, np_float64, time_to_seconds, + cartesian_product_numpy, np_ndarray, np_array_factory, get_homogeneous_numpy_compatible_type_of_seq, dict_, + CannotCompare) +from collections.abc import Mapping, Iterable + +if TYPE_CHECKING: + from deepdiff.diff import DeepDiffProtocol + + class DistanceProtocol(DeepDiffProtocol, Protocol): + hashes: dict + deephash_parameters: dict + iterable_compare_func: Callable | None + math_epsilon: float + cutoff_distance_for_pairs: float + + def __get_item_rough_length(self, item, parent:str="root") -> float: + ... + + def _to_delta_dict( + self, + directed: bool = True, + report_repetition_required: bool = True, + always_include_values: bool = False, + ) -> dict: + ... + + def __calculate_item_deephash(self, item: Any) -> None: + ... + + + +DISTANCE_CALCS_NEEDS_CACHE = "Distance calculation can not happen once the cache is purged. Try with _cache='keep'" + + +class DistanceMixin: + + def _get_rough_distance(self: "DistanceProtocol"): + """ + Gives a numeric value for the distance of t1 and t2 based on how many operations are needed to convert + one to the other. + + This is a similar concept to the Levenshtein Edit Distance but for the structured data and is it is designed + to be between 0 and 1. + + A distance of zero means the objects are equal and a distance of 1 is very far. + + Note: The distance calculation formula is subject to change in future. Use the distance results only as a + way of comparing the distances of pairs of items with other pairs rather than an absolute distance + such as the one provided by Levenshtein edit distance. + + Info: The current algorithm is based on the number of operations that are needed to convert t1 to t2 divided + by the number of items that make up t1 and t2. + """ + + _distance = get_numeric_types_distance( + self.t1, self.t2, max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold) + + if _distance is not not_found: + return _distance + + item = self if self.view == DELTA_VIEW else self._to_delta_dict(report_repetition_required=False) + diff_length = _get_item_length(item) + + if diff_length == 0: + return 0 + + t1_len = self.__get_item_rough_length(self.t1) + t2_len = self.__get_item_rough_length(self.t2) + + return diff_length / (t1_len + t2_len) + + def __get_item_rough_length(self: "DistanceProtocol", item, parent='root'): + """ + Get the rough length of an item. + It is used as a part of calculating the rough distance between objects. + + **parameters** + + item: The item to calculate the rough length for + parent: It is only used for DeepHash reporting purposes. Not really useful here. + """ + if not hasattr(self, 'hashes'): + raise RuntimeError(DISTANCE_CALCS_NEEDS_CACHE) + length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) + if length is None: + self.__calculate_item_deephash(item) + length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) + return length + + def __calculate_item_deephash(self: "DistanceProtocol", item: Any) -> None: + DeepHash( + item, + hashes=self.hashes, + parent='root', + apply_hash=True, + **self.deephash_parameters, + ) + + def _precalculate_distance_by_custom_compare_func( + self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): + pre_calced_distances = dict_() + for added_hash in hashes_added: + for removed_hash in hashes_removed: + try: + is_close_distance = self.iterable_compare_func(t2_hashtable[added_hash].item, t1_hashtable[removed_hash].item) + except CannotCompare: + pass + else: + if is_close_distance: + # an arbitrary small distance if math_epsilon is not defined + distance = self.math_epsilon or 0.000001 + else: + distance = 1 + pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distance + + return pre_calced_distances + + def _precalculate_numpy_arrays_distance( + self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): + + # We only want to deal with 1D arrays. + if isinstance(t2_hashtable[next(iter(hashes_added))].item, (np_ndarray, list)): + return + + pre_calced_distances = dict_() + added = [t2_hashtable[k].item for k in hashes_added] + removed = [t1_hashtable[k].item for k in hashes_removed] + + if _original_type is None: + added_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(added) + removed_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(removed) + if added_numpy_compatible_type and added_numpy_compatible_type == removed_numpy_compatible_type: + _original_type = added_numpy_compatible_type + if _original_type is None: + return + + added = np_array_factory(added, dtype=_original_type) + removed = np_array_factory(removed, dtype=_original_type) + + pairs = cartesian_product_numpy(added, removed) + + pairs_transposed = pairs.T + + distances = _get_numpy_array_distance( + pairs_transposed[0], pairs_transposed[1], + max_=self.cutoff_distance_for_pairs, + use_log_scale=self.use_log_scale, + log_scale_similarity_threshold=self.log_scale_similarity_threshold, + ) + + i = 0 + for added_hash in hashes_added: + for removed_hash in hashes_removed: + pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distances[i] + i += 1 + return pre_calced_distances + + +def _get_item_length(item, parents_ids=frozenset([])): + """ + Get the number of operations in a diff object. + It is designed mainly for the delta view output + but can be used with other dictionary types of view outputs too. + """ + length = 0 + if isinstance(item, Mapping): + for key, subitem in item.items(): + # dedupe the repetition report so the number of times items have shown up does not affect the distance. + if key in {'iterable_items_added_at_indexes', 'iterable_items_removed_at_indexes'}: + new_subitem = dict_() + for path_, indexes_to_items in subitem.items(): + used_value_ids = set() + new_indexes_to_items = dict_() + for k, v in indexes_to_items.items(): + v_id = id(v) + if v_id not in used_value_ids: + used_value_ids.add(v_id) + new_indexes_to_items[k] = v + new_subitem[path_] = new_indexes_to_items + subitem = new_subitem + + # internal keys such as _numpy_paths should not count towards the distance + if isinstance(key, strings) and (key.startswith('_') or key == 'deep_distance' or key == 'new_path'): + continue + + item_id = id(subitem) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + elif isinstance(item, numbers): + length = 1 + elif isinstance(item, strings): + length = 1 + elif isinstance(item, Iterable): + for subitem in item: + item_id = id(subitem) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + elif isinstance(item, type): # it is a class + length = 1 + else: + if hasattr(item, '__dict__'): + for subitem in item.__dict__: + item_id = id(subitem) + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + return length + + +def _get_numbers_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): + """ + Get the distance of 2 numbers. The output is a number between 0 to the max. + The reason is the + When max is returned means the 2 numbers are really far, and 0 means they are equal. + """ + if num1 == num2: + return 0 + if use_log_scale: + distance = logarithmic_distance(num1, num2) + if distance < 0: + return 0 + return distance + if not isinstance(num1, float): + num1 = float(num1) + if not isinstance(num2, float): + num2 = float(num2) + # Since we have a default cutoff of 0.3 distance when + # getting the pairs of items during the ingore_order=True + # calculations, we need to make the divisor of comparison very big + # so that any 2 numbers can be chosen as pairs. + divisor = (num1 + num2) / max_ + if divisor == 0: + return max_ + try: + return min(max_, abs((num1 - num2) / divisor)) + except Exception: # pragma: no cover. I don't think this line will ever run but doesn't hurt to leave it. + return max_ # pragma: no cover + + +def _numpy_div(a, b, replace_inf_with=1): + max_array = np.full(shape=a.shape, fill_value=replace_inf_with, dtype=np_float64) + result = np.divide(a, b, out=max_array, where=b != 0, dtype=np_float64) + # wherever 2 numbers are the same, make sure the distance is zero. This is mainly for 0 divided by zero. + result[a == b] = 0 + return result + +# To deal with numbers close to zero +MATH_LOG_OFFSET = 1e-10 + +def numpy_apply_log_keep_sign(array, offset=MATH_LOG_OFFSET): + # Calculate the absolute value and add the offset + abs_plus_offset = np.abs(array) + offset + + # Calculate the logarithm + log_values = np.log(abs_plus_offset) + + # Apply the original signs to the log values + signed_log_values = np.copysign(log_values, array) + + return signed_log_values + + +def logarithmic_similarity(a: numbers, b: numbers, threshold: float=0.1) -> float: + """ + A threshold of 0.1 translates to about 10.5% difference. + A threshold of 0.5 translates to about 65% difference. + A threshold of 0.05 translates to about 5.1% difference. + """ + return logarithmic_distance(a, b) < threshold + + +def logarithmic_distance(a: numbers, b: numbers) -> float: + # Apply logarithm to the absolute values and consider the sign + a = float(a) + b = float(b) + log_a = math.copysign(math.log(abs(a) + MATH_LOG_OFFSET), a) + log_b = math.copysign(math.log(abs(b) + MATH_LOG_OFFSET), b) + + return abs(log_a - log_b) + + +def _get_numpy_array_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): + """ + Get the distance of 2 numbers. The output is a number between 0 to the max. + The reason is the + When max is returned means the 2 numbers are really far, and 0 means they are equal. + """ + # Since we have a default cutoff of 0.3 distance when + # getting the pairs of items during the ingore_order=True + # calculations, we need to make the divisor of comparison very big + # so that any 2 numbers can be chosen as pairs. + if use_log_scale: + num1 = numpy_apply_log_keep_sign(num1) + num2 = numpy_apply_log_keep_sign(num2) + + divisor = (num1 + num2) / max_ + result = _numpy_div((num1 - num2), divisor, replace_inf_with=max_) + + distance_array = np.clip(np.absolute(result), 0, max_) + if use_log_scale: + distance_array[distance_array < log_scale_similarity_threshold] = 0 + return distance_array + + +def _get_datetime_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(date1.timestamp(), date2.timestamp(), max_) + + +def _get_date_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(date1.toordinal(), date2.toordinal(), max_) + + +def _get_timedelta_distance(timedelta1, timedelta2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(timedelta1.total_seconds(), timedelta2.total_seconds(), max_) + + +def _get_time_distance(time1, time2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(time_to_seconds(time1), time_to_seconds(time2), max_) + + +TYPES_TO_DIST_FUNC = [ + (only_numbers, _get_numbers_distance), + (datetime.datetime, _get_datetime_distance), + (datetime.date, _get_date_distance), + (datetime.timedelta, _get_timedelta_distance), + (datetime.time, _get_time_distance), +] + + +def get_numeric_types_distance(num1, num2, max_, use_log_scale=False, log_scale_similarity_threshold=0.1): + for type_, func in TYPES_TO_DIST_FUNC: + if isinstance(num1, type_) and isinstance(num2, type_): + return func(num1, num2, max_, use_log_scale, log_scale_similarity_threshold) + return not_found diff --git a/.venv/lib/python3.12/site-packages/deepdiff/helper.py b/.venv/lib/python3.12/site-packages/deepdiff/helper.py new file mode 100644 index 00000000..63a4e315 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/helper.py @@ -0,0 +1,837 @@ +import sys +import re +import os +import datetime +import uuid +import logging +import warnings +import string +import time +import enum +import ipaddress +from typing import NamedTuple, Any, List, Optional, Dict, Union, TYPE_CHECKING, Tuple +from ast import literal_eval +from decimal import Decimal, localcontext, InvalidOperation as InvalidDecimalOperation +from itertools import repeat +from orderly_set import StableSetEq as SetOrderedBase # median: 1.0867 s for cache test, 5.63s for all tests +from threading import Timer + +if TYPE_CHECKING: + from pytz.tzinfo import BaseTzInfo + + +class np_type: + pass + + +class pydantic_base_model_type: + pass + + +class SetOrdered(SetOrderedBase): + def __repr__(self): + return str(list(self)) + + +try: + import numpy as np +except ImportError: # pragma: no cover. The case without Numpy is tested locally only. + np = None # pragma: no cover. + np_array_factory = 'numpy not available' # pragma: no cover. + np_ndarray = np_type # pragma: no cover. + np_bool_ = np_type # pragma: no cover. + np_int8 = np_type # pragma: no cover. + np_int16 = np_type # pragma: no cover. + np_int32 = np_type # pragma: no cover. + np_int64 = np_type # pragma: no cover. + np_uint8 = np_type # pragma: no cover. + np_uint16 = np_type # pragma: no cover. + np_uint32 = np_type # pragma: no cover. + np_uint64 = np_type # pragma: no cover. + np_intp = np_type # pragma: no cover. + np_uintp = np_type # pragma: no cover. + np_float32 = np_type # pragma: no cover. + np_float64 = np_type # pragma: no cover. + np_double = np_type # pragma: no cover. + np_floating = np_type # pragma: no cover. + np_complex64 = np_type # pragma: no cover. + np_complex128 = np_type # pragma: no cover. + np_cdouble = np_type # pragma: no cover. + np_complexfloating = np_type # pragma: no cover. +else: + np_array_factory = np.array + np_ndarray = np.ndarray + np_bool_ = np.bool_ + np_int8 = np.int8 + np_int16 = np.int16 + np_int32 = np.int32 + np_int64 = np.int64 + np_uint8 = np.uint8 + np_uint16 = np.uint16 + np_uint32 = np.uint32 + np_uint64 = np.uint64 + np_intp = np.intp + np_uintp = np.uintp + np_float32 = np.float32 + np_float64 = np.float64 + np_double = np.double # np.float_ is an alias for np.double and is being removed by NumPy 2.0 + np_floating = np.floating + np_complex64 = np.complex64 + np_complex128 = np.complex128 + np_cdouble = np.cdouble # np.complex_ is an alias for np.cdouble and is being removed by NumPy 2.0 + np_complexfloating = np.complexfloating + +numpy_numbers = ( + np_int8, np_int16, np_int32, np_int64, np_uint8, + np_uint16, np_uint32, np_uint64, np_intp, np_uintp, + np_float32, np_float64, np_double, np_floating, np_complex64, + np_complex128, np_cdouble,) + +numpy_complex_numbers = ( + np_complexfloating, np_complex64, np_complex128, np_cdouble, +) + +numpy_dtypes = set(numpy_numbers) +numpy_dtypes.add(np_bool_) # type: ignore + +numpy_dtype_str_to_type = { + item.__name__: item for item in numpy_dtypes +} + +try: + from pydantic.main import BaseModel as PydanticBaseModel # type: ignore +except ImportError: + PydanticBaseModel = pydantic_base_model_type + + +logger = logging.getLogger(__name__) + +py_major_version = sys.version_info.major +py_minor_version = sys.version_info.minor + +py_current_version = Decimal("{}.{}".format(py_major_version, py_minor_version)) + +py2 = py_major_version == 2 +py3 = py_major_version == 3 +py4 = py_major_version == 4 + + +NUMERICS = frozenset(string.digits) + + +class EnumBase(str, enum.Enum): + def __repr__(self): + """ + We need to add a single quotes so we can easily copy the value when we do ipdb. + """ + return f"'{self.name}'" + + def __str__(self): + return self.name + + +def _int_or_zero(value): + """ + Tries to extract some number from a string. + + 12c becomes 12 + """ + try: + return int(value) + except Exception: + result = [] + for char in value: + if char in NUMERICS: + result.append(char) + if result: + return int(''.join(result)) + return 0 + + +def get_semvar_as_integer(version): + """ + Converts: + + '1.23.5' to 1023005 + """ + version = version.split('.') + if len(version) > 3: + version = version[:3] + elif len(version) < 3: + version.extend(['0'] * (3 - len(version))) + + return sum([10**(i * 3) * _int_or_zero(v) for i, v in enumerate(reversed(version))]) + + +# we used to use OrderedDictPlus when dictionaries in Python were not ordered. +dict_ = dict + +if py4: + logger.warning('Python 4 is not supported yet. Switching logic to Python 3.') # pragma: no cover + py3 = True # pragma: no cover + +if py2: # pragma: no cover + sys.exit('Python 2 is not supported anymore. The last version of DeepDiff that supported Py2 was 3.3.0') + +pypy3 = py3 and hasattr(sys, "pypy_translation_info") + + +if np and get_semvar_as_integer(np.__version__) < 1019000: + sys.exit('The minimum required Numpy version is 1.19.0. Please upgrade your Numpy package.') + +strings = (str, bytes) # which are both basestring +unicode_type = str +bytes_type = bytes +only_complex_number = (complex,) + numpy_complex_numbers +only_numbers = (int, float, complex, Decimal) + numpy_numbers +datetimes = (datetime.datetime, datetime.date, datetime.timedelta, datetime.time) +ipranges = (ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network) +uuids = (uuid.UUID, ) +times = (datetime.datetime, datetime.time) +numbers: Tuple = only_numbers + datetimes +booleans = (bool, np_bool_) + +basic_types = strings + numbers + uuids + booleans + (type(None), ) + +class IndexedHash(NamedTuple): + indexes: List + item: Any + +current_dir = os.path.dirname(os.path.abspath(__file__)) + +ID_PREFIX = '!>*id' + +KEY_TO_VAL_STR = "{}:{}" + +TREE_VIEW = 'tree' +TEXT_VIEW = 'text' +DELTA_VIEW = '_delta' + +ENUM_INCLUDE_KEYS = ['__objclass__', 'name', 'value'] + + +def short_repr(item, max_length=15): + """Short representation of item if it is too long""" + item = repr(item) + if len(item) > max_length: + item = '{}...{}'.format(item[:max_length - 3], item[-1]) + return item + + +class ListItemRemovedOrAdded: # pragma: no cover + """Class of conditions to be checked""" + pass + + +class OtherTypes: + def __repr__(self): + return "Error: {}".format(self.__class__.__name__) # pragma: no cover + + __str__ = __repr__ + + +class Skipped(OtherTypes): + pass + + +class Unprocessed(OtherTypes): + pass + + +class NotHashed(OtherTypes): + pass + + +class NotPresent: # pragma: no cover + """ + In a change tree, this indicated that a previously existing object has been removed -- or will only be added + in the future. + We previously used None for this but this caused problem when users actually added and removed None. Srsly guys? :D + """ + + def __repr__(self): + return 'not present' # pragma: no cover + + __str__ = __repr__ + + +class CannotCompare(Exception): + """ + Exception when two items cannot be compared in the compare function. + """ + pass + + +unprocessed = Unprocessed() +skipped = Skipped() +not_hashed = NotHashed() +notpresent = NotPresent() + +# Disabling remapping from old to new keys since the mapping is deprecated. +RemapDict = dict_ + + +# class RemapDict(dict_): +# """ +# DISABLED +# Remap Dictionary. + +# For keys that have a new, longer name, remap the old key to the new key. +# Other keys that don't have a new name are handled as before. +# """ + +# def __getitem__(self, old_key): +# new_key = EXPANDED_KEY_MAP.get(old_key, old_key) +# if new_key != old_key: +# logger.warning( +# "DeepDiff Deprecation: %s is renamed to %s. Please start using " +# "the new unified naming convention.", old_key, new_key) +# if new_key in self: +# return self.get(new_key) +# else: # pragma: no cover +# raise KeyError(new_key) + + +class indexed_set(set): + """ + A set class that lets you get an item by index + + >>> a = indexed_set() + >>> a.add(10) + >>> a.add(20) + >>> a[0] + 10 + """ + + +def add_to_frozen_set(parents_ids, item_id): + return parents_ids | {item_id} + + +def convert_item_or_items_into_set_else_none(items): + if items: + if isinstance(items, strings): + items = {items} + else: + items = set(items) + else: + items = None + return items + + +def add_root_to_paths(paths): + """ + Sometimes the users want to just pass + [key] instead of root[key] for example. + Here we automatically add all sorts of variations that might match + the path they were supposed to pass. + """ + if paths is None: + return + result = SetOrdered() + for path in paths: + if path.startswith('root'): + result.add(path) + else: + if path.isdigit(): + result.add(f"root['{path}']") + result.add(f"root[{path}]") + elif path[0].isdigit(): + result.add(f"root['{path}']") + else: + result.add(f"root.{path}") + result.add(f"root['{path}']") + return result + + +RE_COMPILED_TYPE = type(re.compile('')) + + +def convert_item_or_items_into_compiled_regexes_else_none(items): + if items: + if isinstance(items, (strings, RE_COMPILED_TYPE)): + items = [items] + items = [i if isinstance(i, RE_COMPILED_TYPE) else re.compile(i) for i in items] + else: + items = None + return items + + +def get_id(obj): + """ + Adding some characters to id so they are not just integers to reduce the risk of collision. + """ + return "{}{}".format(ID_PREFIX, id(obj)) + + +def get_type(obj): + """ + Get the type of object or if it is a class, return the class itself. + """ + if isinstance(obj, np_ndarray): + return obj.dtype.type # type: ignore + return obj if type(obj) is type else type(obj) + + +def numpy_dtype_string_to_type(dtype_str): + return numpy_dtype_str_to_type[dtype_str] + + +def type_in_type_group(item, type_group): + return get_type(item) in type_group + + +def type_is_subclass_of_type_group(item, type_group): + return isinstance(item, type_group) \ + or (isinstance(item, type) and issubclass(item, type_group)) \ + or type_in_type_group(item, type_group) + + +def get_doc(doc_filename): + try: + with open(os.path.join(current_dir, '../docs/', doc_filename), 'r') as doc_file: + doc = doc_file.read() + except Exception: # pragma: no cover + doc = 'Failed to load the docstrings. Please visit: https://zepworks.com/deepdiff/current/' # pragma: no cover + return doc + + +number_formatting = { + "f": r'{:.%sf}', + "e": r'{:.%se}', +} + + +def number_to_string(number, significant_digits, number_format_notation="f"): + """ + Convert numbers to string considering significant digits. + """ + try: + using = number_formatting[number_format_notation] + except KeyError: + raise ValueError("number_format_notation got invalid value of {}. The valid values are 'f' and 'e'".format(number_format_notation)) from None + + if not isinstance(number, numbers): # type: ignore + return number + elif isinstance(number, Decimal): + with localcontext() as ctx: + # Precision = number of integer digits + significant_digits + # Using number//1 to get the integer part of the number + ctx.prec = len(str(abs(number // 1))) + significant_digits + try: + number = number.quantize(Decimal('0.' + '0' * significant_digits)) + except InvalidDecimalOperation: + # Sometimes rounding up causes a higher precision to be needed for the quantize operation + # For example '999.99999999' will become '1000.000000' after quantize + ctx.prec += 1 + number = number.quantize(Decimal('0.' + '0' * significant_digits)) + elif isinstance(number, only_complex_number): # type: ignore + # Case for complex numbers. + number = number.__class__( + "{real}+{imag}j".format( # type: ignore + real=number_to_string( + number=number.real, # type: ignore + significant_digits=significant_digits, + number_format_notation=number_format_notation + ), + imag=number_to_string( + number=number.imag, # type: ignore + significant_digits=significant_digits, + number_format_notation=number_format_notation + ) + ) # type: ignore + ) + else: + number = round(number=number, ndigits=significant_digits) # type: ignore + + if significant_digits == 0: + number = int(number) + + if number == 0.0: + # Special case for 0: "-0.xx" should compare equal to "0.xx" + number = abs(number) # type: ignore + + # Cast number to string + result = (using % significant_digits).format(number) + # https://bugs.python.org/issue36622 + if number_format_notation == 'e': + # Removing leading 0 for exponential part. + result = re.sub( + pattern=r'(?<=e(\+|\-))0(?=\d)+', + repl=r'', + string=result + ) + return result + + +class DeepDiffDeprecationWarning(DeprecationWarning): + """ + Use this warning instead of DeprecationWarning + """ + pass + + +def cartesian_product(a, b): + """ + Get the Cartesian product of two iterables + + **parameters** + + a: list of lists + b: iterable to do the Cartesian product + """ + + for i in a: + for j in b: + yield i + (j,) + + +def cartesian_product_of_shape(dimentions, result=None): + """ + Cartesian product of a dimentions iterable. + This is mainly used to traverse Numpy ndarrays. + + Each array has dimentions that are defines in ndarray.shape + """ + if result is None: + result = ((),) # a tuple with an empty tuple + for dimension in dimentions: + result = cartesian_product(result, range(dimension)) + return result + + +def get_numpy_ndarray_rows(obj, shape=None): + """ + Convert a multi dimensional numpy array to list of rows + """ + if shape is None: + shape = obj.shape + + dimentions = shape[:-1] + for path_tuple in cartesian_product_of_shape(dimentions): + result = obj + for index in path_tuple: + result = result[index] + yield path_tuple, result + + +class _NotFound: + + def __eq__(self, other): + return False + + __req__ = __eq__ + + def __repr__(self): + return 'not found' + + __str__ = __repr__ + + +not_found = _NotFound() + +warnings.simplefilter('once', DeepDiffDeprecationWarning) + + +class RepeatedTimer: + """ + Threaded Repeated Timer by MestreLion + https://stackoverflow.com/a/38317060/1497443 + """ + + def __init__(self, interval, function, *args, **kwargs): + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.start_time = time.time() + self.kwargs = kwargs + self.is_running = False + self.start() + + def _get_duration_sec(self): + return int(time.time() - self.start_time) + + def _run(self): + self.is_running = False + self.start() + self.function(*self.args, **self.kwargs) + + def start(self): + self.kwargs.update(duration=self._get_duration_sec()) + if not self.is_running: + self._timer = Timer(self.interval, self._run) + self._timer.start() + self.is_running = True + + def stop(self): + duration = self._get_duration_sec() + if self._timer is not None: + self._timer.cancel() + self.is_running = False + return duration + + +def _eval_decimal(params): + return Decimal(params) + + +def _eval_datetime(params): + params = f'({params})' + params = literal_eval(params) + return datetime.datetime(*params) + + +def _eval_date(params): + params = f'({params})' + params = literal_eval(params) + return datetime.date(*params) + + +LITERAL_EVAL_PRE_PROCESS = [ + ('Decimal(', ')', _eval_decimal), + ('datetime.datetime(', ')', _eval_datetime), + ('datetime.date(', ')', _eval_date), +] + + +def literal_eval_extended(item): + """ + An extended version of literal_eval + """ + try: + return literal_eval(item) + except (SyntaxError, ValueError): + for begin, end, func in LITERAL_EVAL_PRE_PROCESS: + if item.startswith(begin) and item.endswith(end): + # Extracting and removing extra quotes so for example "Decimal('10.1')" becomes "'10.1'" and then '10.1' + params = item[len(begin): -len(end)].strip('\'\"') + return func(params) + raise + + +def time_to_seconds(t:datetime.time) -> int: + return (t.hour * 60 + t.minute) * 60 + t.second + + +def datetime_normalize( + truncate_datetime:Union[str, None], + obj:Union[datetime.datetime, datetime.time], + default_timezone: Union[ + datetime.timezone, "BaseTzInfo" + ] = datetime.timezone.utc, +) -> Any: + if truncate_datetime: + if truncate_datetime == 'second': + obj = obj.replace(microsecond=0) + elif truncate_datetime == 'minute': + obj = obj.replace(second=0, microsecond=0) + elif truncate_datetime == 'hour': + obj = obj.replace(minute=0, second=0, microsecond=0) + elif truncate_datetime == 'day': + obj = obj.replace(hour=0, minute=0, second=0, microsecond=0) + if isinstance(obj, datetime.datetime): + if has_timezone(obj): + obj = obj.astimezone(default_timezone) + else: + obj = obj.replace(tzinfo=default_timezone) + elif isinstance(obj, datetime.time): + return time_to_seconds(obj) + return obj + + +def has_timezone(dt): + """ + Function to check if a datetime object has a timezone + + Checking dt.tzinfo.utcoffset(dt) ensures that the datetime object is truly timezone-aware + because some datetime objects may have a tzinfo attribute that is not None but still + doesn't provide a valid offset. + + Certain tzinfo objects, such as pytz.timezone(None), can exist but do not provide meaningful UTC offset information. + If tzinfo is present but calling .utcoffset(dt) returns None, the datetime is not truly timezone-aware. + """ + return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None + + +def get_truncate_datetime(truncate_datetime) -> Union[str, None]: + """ + Validates truncate_datetime value + """ + if truncate_datetime not in {None, 'second', 'minute', 'hour', 'day'}: + raise ValueError("truncate_datetime must be second, minute, hour or day") + return truncate_datetime + + +def cartesian_product_numpy(*arrays): + """ + Cartesian product of Numpy arrays by Paul Panzer + https://stackoverflow.com/a/49445693/1497443 + """ + la = len(arrays) + dtype = np.result_type(*arrays) # type: ignore + arr = np.empty((la, *map(len, arrays)), dtype=dtype) # type: ignore + idx = slice(None), *repeat(None, la) + for i, a in enumerate(arrays): + arr[i, ...] = a[idx[:la - i]] + return arr.reshape(la, -1).T + + +def diff_numpy_array(A, B): + """ + Numpy Array A - B + return items in A that are not in B + By Divakar + https://stackoverflow.com/a/52417967/1497443 + """ + return A[~np.isin(A, B)] # type: ignore + + +PYTHON_TYPE_TO_NUMPY_TYPE = { + int: np_int64, + float: np_float64, + Decimal: np_float64 +} + + +def get_homogeneous_numpy_compatible_type_of_seq(seq): + """ + Return with the numpy dtype if the array can be converted to a non-object numpy array. + Originally written by mgilson https://stackoverflow.com/a/13252348/1497443 + This is the modified version. + """ + iseq = iter(seq) + first_type = type(next(iseq)) + if first_type in {int, float, Decimal}: + type_ = first_type if all((type(x) is first_type) for x in iseq) else False + return PYTHON_TYPE_TO_NUMPY_TYPE.get(type_, False) + else: + return False + + +def detailed__dict__(obj, ignore_private_variables=True, ignore_keys=frozenset(), include_keys=None): + """ + Get the detailed dictionary of an object. + + This is used so we retrieve object properties too. + """ + if include_keys: + result = {} + for key in include_keys: + try: + value = getattr(obj, key) + except Exception: + pass + else: + if not callable(value) or key == '__objclass__': # We don't want to compare functions, however for backward compatibility, __objclass__ needs to be reported. + result[key] = value + else: + result = obj.__dict__.copy() # A shallow copy + private_var_prefix = f"_{obj.__class__.__name__}__" # The semi private variables in Python get this prefix + for key in ignore_keys: + if key in result or ( + ignore_private_variables and key.startswith('__') and not key.startswith(private_var_prefix) + ): + del result[key] + for key in dir(obj): + if key not in result and key not in ignore_keys and ( + not ignore_private_variables or ( + ignore_private_variables and not key.startswith('__') and not key.startswith(private_var_prefix) + ) + ): + value = getattr(obj, key) + if not callable(value): + result[key] = value + return result + + +def named_tuple_repr(self): + fields = [] + for field, value in self._asdict().items(): + # Only include fields that do not have their default value + if field in self._field_defaults: + if value != self._field_defaults[field]: + fields.append(f"{field}={value!r}") + else: + fields.append(f"{field}={value!r}") + + return f"{self.__class__.__name__}({', '.join(fields)})" + + +class OpcodeTag(EnumBase): + insert = 'insert' + delete = 'delete' + equal = 'equal' + replace = 'replace' # type: ignore + # swapped = 'swapped' # in the future we should support reporting of items swapped with each other + + +class Opcode(NamedTuple): + tag: str + t1_from_index: int + t1_to_index: int + t2_from_index: int + t2_to_index: int + old_values: Optional[List[Any]] = None + new_values: Optional[List[Any]] = None + + __repr__ = __str__ = named_tuple_repr + + +class FlatDataAction(EnumBase): + values_changed = 'values_changed' + type_changes = 'type_changes' + set_item_added = 'set_item_added' + set_item_removed = 'set_item_removed' + dictionary_item_added = 'dictionary_item_added' + dictionary_item_removed = 'dictionary_item_removed' + iterable_item_added = 'iterable_item_added' + iterable_item_removed = 'iterable_item_removed' + iterable_item_moved = 'iterable_item_moved' + iterable_items_inserted = 'iterable_items_inserted' # opcode + iterable_items_deleted = 'iterable_items_deleted' # opcode + iterable_items_replaced = 'iterable_items_replaced' # opcode + iterable_items_equal = 'iterable_items_equal' # opcode + attribute_removed = 'attribute_removed' + attribute_added = 'attribute_added' + unordered_iterable_item_added = 'unordered_iterable_item_added' + unordered_iterable_item_removed = 'unordered_iterable_item_removed' + initiated = "initiated" + + +OPCODE_TAG_TO_FLAT_DATA_ACTION = { + OpcodeTag.insert: FlatDataAction.iterable_items_inserted, + OpcodeTag.delete: FlatDataAction.iterable_items_deleted, + OpcodeTag.replace: FlatDataAction.iterable_items_replaced, + OpcodeTag.equal: FlatDataAction.iterable_items_equal, +} + +FLAT_DATA_ACTION_TO_OPCODE_TAG = {v: i for i, v in OPCODE_TAG_TO_FLAT_DATA_ACTION.items()} + + +UnkownValueCode: str = 'unknown___' + + +class FlatDeltaRow(NamedTuple): + path: List + action: FlatDataAction + value: Optional[Any] = UnkownValueCode + old_value: Optional[Any] = UnkownValueCode + type: Optional[Any] = UnkownValueCode + old_type: Optional[Any] = UnkownValueCode + new_path: Optional[List] = None + t1_from_index: Optional[int] = None + t1_to_index: Optional[int] = None + t2_from_index: Optional[int] = None + t2_to_index: Optional[int] = None + + __repr__ = __str__ = named_tuple_repr + + +JSON = Union[Dict[str, str], List[str], List[int], Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] + + +class SummaryNodeType(EnumBase): + dict = 'dict' + list = 'list' + leaf = 'leaf' diff --git a/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py new file mode 100644 index 00000000..75d1708e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py @@ -0,0 +1,217 @@ +""" +LFU cache Written by Shane Wang +https://medium.com/@epicshane/a-python-implementation-of-lfu-least-frequently-used-cache-with-o-1-time-complexity-e16b34a3c49b +https://github.com/luxigner/lfu_cache +Modified by Sep Dehpour +""" +from collections import defaultdict +from threading import Lock +from statistics import mean +from deepdiff.helper import not_found, dict_, SetOrdered + + +class CacheNode: + def __init__(self, key, report_type, value, freq_node, pre, nxt): + self.key = key + if report_type: + self.content = defaultdict(SetOrdered) + self.content[report_type].add(value) + else: + self.content = value + self.freq_node = freq_node + self.pre = pre # previous CacheNode + self.nxt = nxt # next CacheNode + + def free_myself(self): + if self.freq_node.cache_head == self.freq_node.cache_tail: # type: ignore + self.freq_node.cache_head = self.freq_node.cache_tail = None # type: ignore + elif self.freq_node.cache_head == self: # type: ignore + self.nxt.pre = None # type: ignore + self.freq_node.cache_head = self.nxt # type: ignore + elif self.freq_node.cache_tail == self: # type: ignore + self.pre.nxt = None # type: ignore + self.freq_node.cache_tail = self.pre # type: ignore + else: + self.pre.nxt = self.nxt # type: ignore + self.nxt.pre = self.pre # type: ignore + + self.pre = None + self.nxt = None + self.freq_node = None + + +class FreqNode: + def __init__(self, freq, pre, nxt): + self.freq = freq + self.pre = pre # previous FreqNode + self.nxt = nxt # next FreqNode + self.cache_head = None # CacheNode head under this FreqNode + self.cache_tail = None # CacheNode tail under this FreqNode + + def count_caches(self): + if self.cache_head is None and self.cache_tail is None: + return 0 + elif self.cache_head == self.cache_tail: + return 1 + else: + return '2+' + + def remove(self): + if self.pre is not None: + self.pre.nxt = self.nxt + if self.nxt is not None: + self.nxt.pre = self.pre + + pre = self.pre + nxt = self.nxt + self.pre = self.nxt = self.cache_head = self.cache_tail = None + + return (pre, nxt) + + def pop_head_cache(self): + if self.cache_head is None and self.cache_tail is None: + return None + elif self.cache_head == self.cache_tail: + cache_head = self.cache_head + self.cache_head = self.cache_tail = None + return cache_head + else: + cache_head = self.cache_head + self.cache_head.nxt.pre = None # type: ignore + self.cache_head = self.cache_head.nxt # type: ignore + return cache_head + + def append_cache_to_tail(self, cache_node): + cache_node.freq_node = self + + if self.cache_head is None and self.cache_tail is None: + self.cache_head = self.cache_tail = cache_node + else: + cache_node.pre = self.cache_tail + cache_node.nxt = None + self.cache_tail.nxt = cache_node # type: ignore + self.cache_tail = cache_node + + def insert_after_me(self, freq_node): + freq_node.pre = self + freq_node.nxt = self.nxt + + if self.nxt is not None: + self.nxt.pre = freq_node + + self.nxt = freq_node + + def insert_before_me(self, freq_node): + if self.pre is not None: + self.pre.nxt = freq_node + + freq_node.pre = self.pre + freq_node.nxt = self + self.pre = freq_node + + +class LFUCache: + + def __init__(self, capacity): + self.cache = dict_() # {key: cache_node} + if capacity <= 0: + raise ValueError('Capacity of LFUCache needs to be positive.') # pragma: no cover. + self.capacity = capacity + self.freq_link_head = None + self.lock = Lock() + + def get(self, key): + with self.lock: + if key in self.cache: + cache_node = self.cache[key] + freq_node = cache_node.freq_node + content = cache_node.content + + self.move_forward(cache_node, freq_node) + + return content + else: + return not_found + + def set(self, key, report_type=None, value=None): + with self.lock: + if key in self.cache: + cache_node = self.cache[key] + if report_type: + cache_node.content[report_type].add(value) + else: + cache_node.content = value + else: + if len(self.cache) >= self.capacity: + self.dump_cache() + + self.create_cache_node(key, report_type, value) + + def __contains__(self, key): + return key in self.cache + + def move_forward(self, cache_node, freq_node): + if freq_node.nxt is None or freq_node.nxt.freq != freq_node.freq + 1: + target_freq_node = FreqNode(freq_node.freq + 1, None, None) + target_empty = True + else: + target_freq_node = freq_node.nxt + target_empty = False + + cache_node.free_myself() + target_freq_node.append_cache_to_tail(cache_node) + + if target_empty: + freq_node.insert_after_me(target_freq_node) + + if freq_node.count_caches() == 0: + if self.freq_link_head == freq_node: + self.freq_link_head = target_freq_node + + freq_node.remove() + + def dump_cache(self): + head_freq_node = self.freq_link_head + self.cache.pop(head_freq_node.cache_head.key) # type: ignore + head_freq_node.pop_head_cache() # type: ignore + + if head_freq_node.count_caches() == 0: # type: ignore + self.freq_link_head = head_freq_node.nxt # type: ignore + head_freq_node.remove() # type: ignore + + def create_cache_node(self, key, report_type, value): + cache_node = CacheNode( + key=key, report_type=report_type, + value=value, freq_node=None, pre=None, nxt=None) + self.cache[key] = cache_node + + if self.freq_link_head is None or self.freq_link_head.freq != 0: + new_freq_node = FreqNode(0, None, None) + new_freq_node.append_cache_to_tail(cache_node) + + if self.freq_link_head is not None: + self.freq_link_head.insert_before_me(new_freq_node) + + self.freq_link_head = new_freq_node + else: + self.freq_link_head.append_cache_to_tail(cache_node) + + def get_sorted_cache_keys(self): + result = [(i, freq.freq_node.freq) for i, freq in self.cache.items()] + result.sort(key=lambda x: -x[1]) + return result + + def get_average_frequency(self): + return mean(freq.freq_node.freq for freq in self.cache.values()) + + +class DummyLFU: + + def __init__(self, *args, **kwargs): + pass + + set = __init__ + get = __init__ + + def __contains__(self, key): + return False diff --git a/.venv/lib/python3.12/site-packages/deepdiff/model.py b/.venv/lib/python3.12/site-packages/deepdiff/model.py new file mode 100644 index 00000000..41dd7517 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/model.py @@ -0,0 +1,974 @@ +import logging +from collections.abc import Mapping +from copy import copy +from deepdiff.helper import ( + RemapDict, strings, notpresent, get_type, numpy_numbers, np, literal_eval_extended, + dict_, SetOrdered) +from deepdiff.path import stringify_element + +logger = logging.getLogger(__name__) + +FORCE_DEFAULT = 'fake' +UP_DOWN = {'up': 'down', 'down': 'up'} + +REPORT_KEYS = { + "type_changes", + "dictionary_item_added", + "dictionary_item_removed", + "values_changed", + "unprocessed", + "iterable_item_added", + "iterable_item_removed", + "iterable_item_moved", + "attribute_added", + "attribute_removed", + "set_item_removed", + "set_item_added", + "repetition_change", +} + +CUSTOM_FIELD = "__internal:custom:extra_info" + + +class DoesNotExist(Exception): + pass + + +class ResultDict(RemapDict): + + def remove_empty_keys(self): + """ + Remove empty keys from this object. Should always be called after the result is final. + :return: + """ + empty_keys = [k for k, v in self.items() if not isinstance(v, (int)) and not v] + + for k in empty_keys: + del self[k] + + +class TreeResult(ResultDict): + def __init__(self): + for key in REPORT_KEYS: + self[key] = SetOrdered() + + def mutual_add_removes_to_become_value_changes(self): + """ + There might be the same paths reported in the results as removed and added. + In such cases they should be reported as value_changes. + + Note that this function mutates the tree in ways that causes issues when report_repetition=True + and should be avoided in that case. + + This function should only be run on the Tree Result. + """ + iterable_item_added = self.get('iterable_item_added') + iterable_item_removed = self.get('iterable_item_removed') + if iterable_item_added is not None and iterable_item_removed is not None: + added_paths = {i.path(): i for i in iterable_item_added} + removed_paths = {i.path(): i for i in iterable_item_removed} + mutual_paths = set(added_paths) & set(removed_paths) + + if mutual_paths and 'values_changed' not in self or self['values_changed'] is None: + self['values_changed'] = SetOrdered() + for path in mutual_paths: + level_before = removed_paths[path] + iterable_item_removed.remove(level_before) + level_after = added_paths[path] + iterable_item_added.remove(level_after) + level_before.t2 = level_after.t2 + self['values_changed'].add(level_before) # type: ignore + level_before.report_type = 'values_changed' + if 'iterable_item_removed' in self and not iterable_item_removed: + del self['iterable_item_removed'] + if 'iterable_item_added' in self and not iterable_item_added: + del self['iterable_item_added'] + + def __getitem__(self, item): + if item not in self: + self[item] = SetOrdered() + return self.get(item) + + def __len__(self): + length = 0 + for value in self.values(): + if isinstance(value, SetOrdered): + length += len(value) + elif isinstance(value, int): + length += 1 + return length + + +class TextResult(ResultDict): + ADD_QUOTES_TO_STRINGS = True + + def __init__(self, tree_results=None, verbose_level=1): + self.verbose_level = verbose_level + # TODO: centralize keys + self.update({ + "type_changes": dict_(), + "dictionary_item_added": self.__set_or_dict(), + "dictionary_item_removed": self.__set_or_dict(), + "values_changed": dict_(), + "unprocessed": [], + "iterable_item_added": dict_(), + "iterable_item_removed": dict_(), + "iterable_item_moved": dict_(), + "attribute_added": self.__set_or_dict(), + "attribute_removed": self.__set_or_dict(), + "set_item_removed": SetOrdered(), + "set_item_added": SetOrdered(), + "repetition_change": dict_() + }) + + if tree_results: + self._from_tree_results(tree_results) + + def __set_or_dict(self): + return {} if self.verbose_level >= 2 else SetOrdered() + + def _from_tree_results(self, tree): + """ + Populate this object by parsing an existing reference-style result dictionary. + :param tree: A TreeResult + :return: + """ + self._from_tree_type_changes(tree) + self._from_tree_default(tree, 'dictionary_item_added') + self._from_tree_default(tree, 'dictionary_item_removed') + self._from_tree_value_changed(tree) + self._from_tree_unprocessed(tree) + self._from_tree_default(tree, 'iterable_item_added') + self._from_tree_default(tree, 'iterable_item_removed') + self._from_tree_iterable_item_moved(tree) + self._from_tree_default(tree, 'attribute_added') + self._from_tree_default(tree, 'attribute_removed') + self._from_tree_set_item_removed(tree) + self._from_tree_set_item_added(tree) + self._from_tree_repetition_change(tree) + self._from_tree_deep_distance(tree) + self._from_tree_custom_results(tree) + + def _from_tree_default(self, tree, report_type, ignore_if_in_iterable_opcodes=False): + if report_type in tree: + + for change in tree[report_type]: # report each change + # When we convert from diff to delta result, we care more about opcodes than iterable_item_added or removed + if ( + ignore_if_in_iterable_opcodes + and report_type in {"iterable_item_added", "iterable_item_removed"} + and change.up.path(force=FORCE_DEFAULT) in self["_iterable_opcodes"] + ): + continue + # determine change direction (added or removed) + # Report t2 (the new one) whenever possible. + # In cases where t2 doesn't exist (i.e. stuff removed), report t1. + if change.t2 is not notpresent: + item = change.t2 + else: + item = change.t1 + + # do the reporting + report = self[report_type] + if isinstance(report, SetOrdered): + report.add(change.path(force=FORCE_DEFAULT)) + elif isinstance(report, dict): + report[change.path(force=FORCE_DEFAULT)] = item + elif isinstance(report, list): # pragma: no cover + # we don't actually have any of those right now, but just in case + report.append(change.path(force=FORCE_DEFAULT)) + else: # pragma: no cover + # should never happen + raise TypeError("Cannot handle {} report container type.". + format(report)) + + def _from_tree_type_changes(self, tree): + if 'type_changes' in tree: + for change in tree['type_changes']: + path = change.path(force=FORCE_DEFAULT) + if type(change.t1) is type: + include_values = False + old_type = change.t1 + new_type = change.t2 + else: + include_values = True + old_type = get_type(change.t1) + new_type = get_type(change.t2) + remap_dict = RemapDict({ + 'old_type': old_type, + 'new_type': new_type, + }) + if self.verbose_level > 1: + new_path = change.path(use_t2=True, force=FORCE_DEFAULT) + if path != new_path: + remap_dict['new_path'] = new_path + self['type_changes'][path] = remap_dict + if self.verbose_level and include_values: + remap_dict.update(old_value=change.t1, new_value=change.t2) + + def _from_tree_value_changed(self, tree): + if 'values_changed' in tree and self.verbose_level > 0: + for change in tree['values_changed']: + path = change.path(force=FORCE_DEFAULT) + the_changed = {'new_value': change.t2, 'old_value': change.t1} + if self.verbose_level > 1: + new_path = change.path(use_t2=True, force=FORCE_DEFAULT) + if path != new_path: + the_changed['new_path'] = new_path + self['values_changed'][path] = the_changed + if 'diff' in change.additional: + the_changed.update({'diff': change.additional['diff']}) + + def _from_tree_iterable_item_moved(self, tree): + if 'iterable_item_moved' in tree and self.verbose_level > 1: + for change in tree['iterable_item_moved']: + the_changed = {'new_path': change.path(use_t2=True), 'value': change.t2} + self['iterable_item_moved'][change.path( + force=FORCE_DEFAULT)] = the_changed + + def _from_tree_unprocessed(self, tree): + if 'unprocessed' in tree: + for change in tree['unprocessed']: + self['unprocessed'].append("{}: {} and {}".format(change.path( + force=FORCE_DEFAULT), change.t1, change.t2)) + + def _from_tree_set_item_added_or_removed(self, tree, key): + if key in tree: + set_item_info = self[key] + is_dict = isinstance(set_item_info, Mapping) + for change in tree[key]: + path = change.up.path( + ) # we want't the set's path, the added item is not directly accessible + item = change.t2 if key == 'set_item_added' else change.t1 + if self.ADD_QUOTES_TO_STRINGS and isinstance(item, strings): + item = "'%s'" % item + if is_dict: + if path not in set_item_info: + set_item_info[path] = set() # type: ignore + set_item_info[path].add(item) + else: + set_item_info.add("{}[{}]".format(path, str(item))) + # this syntax is rather peculiar, but it's DeepDiff 2.x compatible) + + def _from_tree_set_item_added(self, tree): + self._from_tree_set_item_added_or_removed(tree, key='set_item_added') + + def _from_tree_set_item_removed(self, tree): + self._from_tree_set_item_added_or_removed(tree, key='set_item_removed') + + def _from_tree_repetition_change(self, tree): + if 'repetition_change' in tree: + for change in tree['repetition_change']: + path = change.path(force=FORCE_DEFAULT) + self['repetition_change'][path] = RemapDict( + change.additional['repetition'] + ) + self['repetition_change'][path]['value'] = change.t1 + + def _from_tree_deep_distance(self, tree): + if 'deep_distance' in tree: + self['deep_distance'] = tree['deep_distance'] + + def _from_tree_custom_results(self, tree): + for k, _level_list in tree.items(): + if k not in REPORT_KEYS: + if not isinstance(_level_list, SetOrdered): + continue + + # if len(_level_list) == 0: + # continue + # + # if not isinstance(_level_list[0], DiffLevel): + # continue + + # _level_list is a list of DiffLevel + _custom_dict = {} + for _level in _level_list: + _custom_dict[_level.path( + force=FORCE_DEFAULT)] = _level.additional.get(CUSTOM_FIELD, {}) + self[k] = _custom_dict + + +class DeltaResult(TextResult): + ADD_QUOTES_TO_STRINGS = False + + def __init__(self, tree_results=None, ignore_order=None, always_include_values=False, _iterable_opcodes=None): + self.ignore_order = ignore_order + self.always_include_values = always_include_values + + self.update({ + "type_changes": dict_(), + "dictionary_item_added": dict_(), + "dictionary_item_removed": dict_(), + "values_changed": dict_(), + "iterable_item_added": dict_(), + "iterable_item_removed": dict_(), + "iterable_item_moved": dict_(), + "attribute_added": dict_(), + "attribute_removed": dict_(), + "set_item_removed": dict_(), + "set_item_added": dict_(), + "iterable_items_added_at_indexes": dict_(), + "iterable_items_removed_at_indexes": dict_(), + "_iterable_opcodes": _iterable_opcodes or {}, + }) + + if tree_results: + self._from_tree_results(tree_results) + + def _from_tree_results(self, tree): + """ + Populate this object by parsing an existing reference-style result dictionary. + :param tree: A TreeResult + :return: + """ + self._from_tree_type_changes(tree) + self._from_tree_default(tree, 'dictionary_item_added') + self._from_tree_default(tree, 'dictionary_item_removed') + self._from_tree_value_changed(tree) + if self.ignore_order: + self._from_tree_iterable_item_added_or_removed( + tree, 'iterable_item_added', delta_report_key='iterable_items_added_at_indexes') + self._from_tree_iterable_item_added_or_removed( + tree, 'iterable_item_removed', delta_report_key='iterable_items_removed_at_indexes') + else: + self._from_tree_default(tree, 'iterable_item_added', ignore_if_in_iterable_opcodes=True) + self._from_tree_default(tree, 'iterable_item_removed', ignore_if_in_iterable_opcodes=True) + self._from_tree_iterable_item_moved(tree) + self._from_tree_default(tree, 'attribute_added') + self._from_tree_default(tree, 'attribute_removed') + self._from_tree_set_item_removed(tree) + self._from_tree_set_item_added(tree) + self._from_tree_repetition_change(tree) + + def _from_tree_iterable_item_added_or_removed(self, tree, report_type, delta_report_key): + if report_type in tree: + for change in tree[report_type]: # report each change + # determine change direction (added or removed) + # Report t2 (the new one) whenever possible. + # In cases where t2 doesn't exist (i.e. stuff removed), report t1. + if change.t2 is not notpresent: + item = change.t2 + else: + item = change.t1 + + # do the reporting + path, param, _ = change.path(force=FORCE_DEFAULT, get_parent_too=True) + try: + iterable_items_added_at_indexes = self[delta_report_key][path] + except KeyError: + iterable_items_added_at_indexes = self[delta_report_key][path] = dict_() + iterable_items_added_at_indexes[param] = item + + def _from_tree_type_changes(self, tree): + if 'type_changes' in tree: + for change in tree['type_changes']: + include_values = None + if type(change.t1) is type: + include_values = False + old_type = change.t1 + new_type = change.t2 + else: + old_type = get_type(change.t1) + new_type = get_type(change.t2) + include_values = True + try: + if new_type in numpy_numbers: + new_t1 = change.t1.astype(new_type) + include_values = not np.array_equal(new_t1, change.t2) + else: + new_t1 = new_type(change.t1) + # If simply applying the type from one value converts it to the other value, + # there is no need to include the actual values in the delta. + include_values = new_t1 != change.t2 + except Exception: + pass + + path = change.path(force=FORCE_DEFAULT) + new_path = change.path(use_t2=True, force=FORCE_DEFAULT) + remap_dict = RemapDict({ + 'old_type': old_type, + 'new_type': new_type, + }) + if path != new_path: + remap_dict['new_path'] = new_path + self['type_changes'][path] = remap_dict + if include_values or self.always_include_values: + remap_dict.update(old_value=change.t1, new_value=change.t2) + + def _from_tree_value_changed(self, tree): + if 'values_changed' in tree: + for change in tree['values_changed']: + path = change.path(force=FORCE_DEFAULT) + new_path = change.path(use_t2=True, force=FORCE_DEFAULT) + the_changed = {'new_value': change.t2, 'old_value': change.t1} + if path != new_path: + the_changed['new_path'] = new_path + self['values_changed'][path] = the_changed + # If we ever want to store the difflib results instead of the new_value + # these lines need to be uncommented and the Delta object needs to be able + # to use them. + # if 'diff' in change.additional: + # the_changed.update({'diff': change.additional['diff']}) + + def _from_tree_repetition_change(self, tree): + if 'repetition_change' in tree: + for change in tree['repetition_change']: + path, _, _ = change.path(get_parent_too=True) + repetition = RemapDict(change.additional['repetition']) + value = change.t1 + try: + iterable_items_added_at_indexes = self['iterable_items_added_at_indexes'][path] + except KeyError: + iterable_items_added_at_indexes = self['iterable_items_added_at_indexes'][path] = dict_() + for index in repetition['new_indexes']: + iterable_items_added_at_indexes[index] = value + + def _from_tree_iterable_item_moved(self, tree): + if 'iterable_item_moved' in tree: + for change in tree['iterable_item_moved']: + if ( + change.up.path(force=FORCE_DEFAULT) not in self["_iterable_opcodes"] + ): + the_changed = {'new_path': change.path(use_t2=True), 'value': change.t2} + self['iterable_item_moved'][change.path( + force=FORCE_DEFAULT)] = the_changed + + +class DiffLevel: + """ + An object of this class represents a single object-tree-level in a reported change. + A double-linked list of these object describes a single change on all of its levels. + Looking at the tree of all changes, a list of those objects represents a single path through the tree + (which is just fancy for "a change"). + This is the result object class for object reference style reports. + + Example: + + >>> t1 = {2: 2, 4: 44} + >>> t2 = {2: "b", 5: 55} + >>> ddiff = DeepDiff(t1, t2, view='tree') + >>> ddiff + {'dictionary_item_added': {<DiffLevel id:4560126096, t1:None, t2:55>}, + 'dictionary_item_removed': {<DiffLevel id:4560126416, t1:44, t2:None>}, + 'type_changes': {<DiffLevel id:4560126608, t1:2, t2:b>}} + + Graph: + + <DiffLevel id:123, original t1,t2> <DiffLevel id:200, original t1,t2> + ↑up ↑up + | | + | ChildRelationship | ChildRelationship + | | + ↓down ↓down + <DiffLevel id:13, t1:None, t2:55> <DiffLevel id:421, t1:44, t2:None> + .path() = 'root[5]' .path() = 'root[4]' + + Note that the 2 top level DiffLevel objects are 2 different objects even though + they are essentially talking about the same diff operation. + + + A ChildRelationship object describing the relationship between t1 and it's child object, + where t1's child object equals down.t1. + + Think about it like a graph: + + +---------------------------------------------------------------+ + | | + | parent difflevel parent | + | + ^ + | + +------|--------------------------|---------------------|-------+ + | | | up | + | Child | | | ChildRelationship + | Relationship | | | + | down | | | + +------|----------------------|-------------------------|-------+ + | v v v | + | child difflevel child | + | | + +---------------------------------------------------------------+ + + + The child_rel example: + + # dictionary_item_removed is a set so in order to get an item from it: + >>> (difflevel,) = ddiff['dictionary_item_removed']) + >>> difflevel.up.t1_child_rel + <DictRelationship id:456, parent:{2: 2, 4: 44}, child:44, param:4> + + >>> (difflevel,) = ddiff['dictionary_item_added']) + >>> difflevel + <DiffLevel id:4560126096, t1:None, t2:55> + + >>> difflevel.up + >>> <DiffLevel id:4560154512, t1:{2: 2, 4: 44}, t2:{2: 'b', 5: 55}> + + >>> difflevel.up + <DiffLevel id:4560154512, t1:{2: 2, 4: 44}, t2:{2: 'b', 5: 55}> + + # t1 didn't exist + >>> difflevel.up.t1_child_rel + + # t2 is added + >>> difflevel.up.t2_child_rel + <DictRelationship id:4560154384, parent:{2: 'b', 5: 55}, child:55, param:5> + + """ + + def __init__(self, + t1, + t2, + down=None, + up=None, + report_type=None, + child_rel1=None, + child_rel2=None, + additional=None, + verbose_level=1): + """ + :param child_rel1: Either: + - An existing ChildRelationship object describing the "down" relationship for t1; or + - A ChildRelationship subclass. In this case, we will create the ChildRelationship objects + for both t1 and t2. + Alternatives for child_rel1 and child_rel2 must be used consistently. + :param child_rel2: Either: + - An existing ChildRelationship object describing the "down" relationship for t2; or + - The param argument for a ChildRelationship class we shall create. + Alternatives for child_rel1 and child_rel2 must be used consistently. + """ + + # The current-level object in the left hand tree + self.t1 = t1 + + # The current-level object in the right hand tree + self.t2 = t2 + + # Another DiffLevel object describing this change one level deeper down the object tree + self.down = down + + # Another DiffLevel object describing this change one level further up the object tree + self.up = up + + self.report_type = report_type + + # If this object is this change's deepest level, this contains a string describing the type of change. + # Examples: "set_item_added", "values_changed" + + # Note: don't use {} as additional's default value - this would turn out to be always the same dict object + self.additional = dict_() if additional is None else additional + + # For some types of changes we store some additional information. + # This is a dict containing this information. + # Currently, this is used for: + # - values_changed: In case the changes data is a multi-line string, + # we include a textual diff as additional['diff']. + # - repetition_change: additional['repetition']: + # e.g. {'old_repeat': 2, 'new_repeat': 1, 'old_indexes': [0, 2], 'new_indexes': [2]} + # the user supplied ChildRelationship objects for t1 and t2 + + # A ChildRelationship object describing the relationship between t1 and it's child object, + # where t1's child object equals down.t1. + # If this relationship is representable as a string, str(self.t1_child_rel) returns a formatted param parsable python string, + # e.g. "[2]", ".my_attribute" + self.t1_child_rel = child_rel1 + + # Another ChildRelationship object describing the relationship between t2 and it's child object. + self.t2_child_rel = child_rel2 + + # Will cache result of .path() per 'force' as key for performance + self._path = dict_() + + self.verbose_level = verbose_level + + def __repr__(self): + if self.verbose_level: + from deepdiff.summarize import summarize + + if self.additional: + additional_repr = summarize(self.additional, max_length=35) + result = "<{} {}>".format(self.path(), additional_repr) + else: + t1_repr = summarize(self.t1, max_length=35) + t2_repr = summarize(self.t2, max_length=35) + result = "<{} t1:{}, t2:{}>".format(self.path(), t1_repr, t2_repr) + else: + result = "<{}>".format(self.path()) + return result + + def __setattr__(self, key, value): + # Setting up or down, will set the opposite link in this linked list. + if key in UP_DOWN and value is not None: + self.__dict__[key] = value + opposite_key = UP_DOWN[key] + value.__dict__[opposite_key] = self + else: + self.__dict__[key] = value + + def __iter__(self): + yield self.t1 + yield self.t2 + + @property + def repetition(self): + return self.additional['repetition'] + + def auto_generate_child_rel(self, klass, param, param2=None): + """ + Auto-populate self.child_rel1 and self.child_rel2. + This requires self.down to be another valid DiffLevel object. + :param klass: A ChildRelationship subclass describing the kind of parent-child relationship, + e.g. DictRelationship. + :param param: A ChildRelationship subclass-dependent parameter describing how to get from parent to child, + e.g. the key in a dict + """ + if self.down.t1 is not notpresent: # type: ignore + self.t1_child_rel = ChildRelationship.create( + klass=klass, parent=self.t1, child=self.down.t1, param=param) # type: ignore + if self.down.t2 is not notpresent: # type: ignore + self.t2_child_rel = ChildRelationship.create( + klass=klass, parent=self.t2, child=self.down.t2, param=param if param2 is None else param2) # type: ignore + + @property + def all_up(self): + """ + Get the root object of this comparison. + (This is a convenient wrapper for following the up attribute as often as you can.) + :rtype: DiffLevel + """ + level = self + while level.up: + level = level.up + return level + + @property + def all_down(self): + """ + Get the leaf object of this comparison. + (This is a convenient wrapper for following the down attribute as often as you can.) + :rtype: DiffLevel + """ + level = self + while level.down: + level = level.down + return level + + @staticmethod + def _format_result(root, result): + return None if result is None else "{}{}".format(root, result) + + def get_root_key(self, use_t2=False): + """ + Get the path's root key value for this change + + For example if the path to the element that is reported to have a change in value is root['X'][0] + then get_root_key should return 'X' + """ + root_level = self.all_up + if(use_t2): + next_rel = root_level.t2_child_rel + else: + next_rel = root_level.t1_child_rel or root_level.t2_child_rel # next relationship object to get a formatted param from + + if next_rel: + return next_rel.param + return notpresent + + def path(self, root="root", force=None, get_parent_too=False, use_t2=False, output_format='str'): + """ + A python syntax string describing how to descend to this level, assuming the top level object is called root. + Returns None if the path is not representable as a string. + This might be the case for example if there are sets involved (because then there's not path at all) or because + custom objects used as dictionary keys (then there is a path but it's not representable). + Example: root['ingredients'][0] + Note: We will follow the left side of the comparison branch, i.e. using the t1's to build the path. + Using t1 or t2 should make no difference at all, except for the last step of a child-added/removed relationship. + If it does in any other case, your comparison path is corrupt. + + **Parameters** + + :param root: The result string shall start with this var name + :param force: Bends the meaning of "no string representation". + If None: + Will strictly return Python-parsable expressions. The result those yield will compare + equal to the objects in question. + If 'yes': + Will return a path including '(unrepresentable)' in place of non string-representable parts. + If 'fake': + Will try to produce an output optimized for readability. + This will pretend all iterables are subscriptable, for example. + :param output_format: The format of the output. The options are 'str' which is the default and produces a + string representation of the path or 'list' to produce a list of keys and attributes + that produce the path. + """ + # TODO: We could optimize this by building on top of self.up's path if it is cached there + cache_key = "{}{}{}{}".format(force, get_parent_too, use_t2, output_format) + if cache_key in self._path: + cached = self._path[cache_key] + if get_parent_too: + parent, param, result = cached + return (self._format_result(root, parent), param, self._format_result(root, result)) + else: + return self._format_result(root, cached) + + if output_format == 'str': + result = parent = param = "" + else: + result = [] + + level = self.all_up # start at the root + + # traverse all levels of this relationship + while level and level is not self: + # get this level's relationship object + if use_t2: + next_rel = level.t2_child_rel or level.t1_child_rel + else: + next_rel = level.t1_child_rel or level.t2_child_rel # next relationship object to get a formatted param from + + # t1 and t2 both are empty + if next_rel is None: + break + + # Build path for this level + if output_format == 'str': + item = next_rel.get_param_repr(force) + if item: + parent = result + param = next_rel.param + result += item + else: + # it seems this path is not representable as a string + result = None + break + elif output_format == 'list': + result.append(next_rel.param) # type: ignore + + # Prepare processing next level + level = level.down + + if output_format == 'str': + if get_parent_too: + self._path[cache_key] = (parent, param, result) # type: ignore + output = (self._format_result(root, parent), param, self._format_result(root, result)) # type: ignore + else: + self._path[cache_key] = result + output = self._format_result(root, result) + else: + output = result + return output + + def create_deeper(self, + new_t1, + new_t2, + child_relationship_class, + child_relationship_param=None, + child_relationship_param2=None, + report_type=None): + """ + Start a new comparison level and correctly link it to this one. + :rtype: DiffLevel + :return: New level + """ + level = self.all_down + result = DiffLevel( + new_t1, new_t2, down=None, up=level, report_type=report_type, verbose_level=self.verbose_level) + level.down = result + level.auto_generate_child_rel( + klass=child_relationship_class, param=child_relationship_param, param2=child_relationship_param2) + return result + + def branch_deeper(self, + new_t1, + new_t2, + child_relationship_class, + child_relationship_param=None, + child_relationship_param2=None, + report_type=None): + """ + Branch this comparison: Do not touch this comparison line, but create a new one with exactly the same content, + just one level deeper. + :rtype: DiffLevel + :return: New level in new comparison line + """ + branch = self.copy() + return branch.create_deeper(new_t1, new_t2, child_relationship_class, + child_relationship_param, child_relationship_param2, report_type) + + def copy(self): + """ + Get a deep copy of this comparision line. + :return: The leaf ("downmost") object of the copy. + """ + orig = self.all_up + result = copy(orig) # copy top level + + while orig is not None: + result.additional = copy(orig.additional) + + if orig.down is not None: # copy and create references to the following level + # copy following level + result.down = copy(orig.down) + + if orig.t1_child_rel is not None: + result.t1_child_rel = ChildRelationship.create( + klass=orig.t1_child_rel.__class__, + parent=result.t1, + child=result.down.t1, + param=orig.t1_child_rel.param) + if orig.t2_child_rel is not None: + result.t2_child_rel = ChildRelationship.create( + klass=orig.t2_child_rel.__class__, + parent=result.t2, + child=result.down.t2, + param=orig.t2_child_rel.param) + + # descend to next level + orig = orig.down + if result.down is not None: + result = result.down + return result + + +class ChildRelationship: + """ + Describes the relationship between a container object (the "parent") and the contained + "child" object. + """ + + # Format to a be used for representing param. + # E.g. for a dict, this turns a formatted param param "42" into "[42]". + param_repr_format = None + + # This is a hook allowing subclasses to manipulate param strings. + # :param string: Input string + # :return: Manipulated string, as appropriate in this context. + quote_str = None + + @staticmethod + def create(klass, parent, child, param=None): + if not issubclass(klass, ChildRelationship): + raise TypeError + return klass(parent, child, param) + + def __init__(self, parent, child, param=None): + # The parent object of this relationship, e.g. a dict + self.parent = parent + + # The child object of this relationship, e.g. a value in a dict + self.child = child + + # A subclass-dependent parameter describing how to get from parent to child, e.g. the key in a dict + self.param = param + + def __repr__(self): + from deepdiff.summarize import summarize + + name = "<{} parent:{}, child:{}, param:{}>" + parent = summarize(self.parent, max_length=35) + child = summarize(self.child, max_length=35) + param = summarize(self.param, max_length=15) + return name.format(self.__class__.__name__, parent, child, param) + + def get_param_repr(self, force=None): + """ + Returns a formatted param python parsable string describing this relationship, + or None if the relationship is not representable as a string. + This string can be appended to the parent Name. + Subclasses representing a relationship that cannot be expressed as a string override this method to return None. + Examples: "[2]", ".attribute", "['mykey']" + :param force: Bends the meaning of "no string representation". + If None: + Will strictly return partials of Python-parsable expressions. The result those yield will compare + equal to the objects in question. + If 'yes': + Will return a formatted param including '(unrepresentable)' instead of the non string-representable part. + + """ + return self.stringify_param(force) + + def stringify_param(self, force=None): + """ + Convert param to a string. Return None if there is no string representation. + This is called by get_param_repr() + :param force: Bends the meaning of "no string representation". + If None: + Will strictly return Python-parsable expressions. The result those yield will compare + equal to the objects in question. + If 'yes': + Will return '(unrepresentable)' instead of None if there is no string representation + + TODO: stringify_param has issues with params that when converted to string via repr, + it is not straight forward to turn them back into the original object. + Although repr is meant to be able to reconstruct the original object but for complex objects, repr + often does not recreate the original object. + Perhaps we should log that the repr reconstruction failed so the user is aware. + """ + param = self.param + if isinstance(param, strings): + result = stringify_element(param, quote_str=self.quote_str) + elif isinstance(param, tuple): # Currently only for numpy ndarrays + result = ']['.join(map(repr, param)) + elif hasattr(param, '__dataclass_fields__'): + attrs_to_values = [f"{key}={value}" for key, value in [(i, getattr(param, i)) for i in param.__dataclass_fields__]] # type: ignore + result = f"{param.__class__.__name__}({','.join(attrs_to_values)})" + else: + candidate = repr(param) + try: + resurrected = literal_eval_extended(candidate) + # Note: This will miss string-representable custom objects. + # However, the only alternative I can currently think of is using eval() which is inherently dangerous. + except (SyntaxError, ValueError) as err: + logger.error( + f'stringify_param was not able to get a proper repr for "{param}". ' + "This object will be reported as None. Add instructions for this object to DeepDiff's " + f"helper.literal_eval_extended to make it work properly: {err}") + result = None + else: + result = candidate if resurrected == param else None + + if result: + result = ':' if self.param_repr_format is None else self.param_repr_format.format(result) + + return result + + +class DictRelationship(ChildRelationship): + param_repr_format = "[{}]" + quote_str = "'{}'" + + +class NumpyArrayRelationship(ChildRelationship): + param_repr_format = "[{}]" + quote_str = None + + +class SubscriptableIterableRelationship(DictRelationship): + pass + + +class InaccessibleRelationship(ChildRelationship): + pass + + +# there is no random access to set elements +class SetRelationship(InaccessibleRelationship): + pass + + +class NonSubscriptableIterableRelationship(InaccessibleRelationship): + + param_repr_format = "[{}]" + + def get_param_repr(self, force=None): + if force == 'yes': + result = "(unrepresentable)" + elif force == 'fake' and self.param: + result = self.stringify_param() + else: + result = None + + return result + + +class AttributeRelationship(ChildRelationship): + param_repr_format = ".{}" diff --git a/.venv/lib/python3.12/site-packages/deepdiff/operator.py b/.venv/lib/python3.12/site-packages/deepdiff/operator.py new file mode 100644 index 00000000..018fa3c6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/operator.py @@ -0,0 +1,69 @@ +import re +from typing import Any, Optional, List +from abc import ABCMeta, abstractmethod +from deepdiff.helper import convert_item_or_items_into_compiled_regexes_else_none + + + +class BaseOperatorPlus(metaclass=ABCMeta): + + @abstractmethod + def match(self, level) -> bool: + """ + Given a level which includes t1 and t2 in the tree view, is this operator a good match to compare t1 and t2? + If yes, we will run the give_up_diffing to compare t1 and t2 for this level. + """ + pass + + @abstractmethod + def give_up_diffing(self, level, diff_instance: float) -> bool: + """ + Given a level which includes t1 and t2 in the tree view, and the "distance" between l1 and l2. + do we consider t1 and t2 to be equal or not. The distance is a number between zero to one and is calculated by DeepDiff to measure how similar objects are. + """ + + @abstractmethod + def normalize_value_for_hashing(self, parent: Any, obj: Any) -> Any: + """ + You can use this function to normalize values for ignore_order=True + + For example, you may want to turn all the words to be lowercase. Then you return obj.lower() + """ + pass + + + +class BaseOperator: + + def __init__(self, regex_paths:Optional[List[str]]=None, types:Optional[List[type]]=None): + if regex_paths: + self.regex_paths = convert_item_or_items_into_compiled_regexes_else_none(regex_paths) + else: + self.regex_paths = None + self.types = types + + def match(self, level) -> bool: + if self.regex_paths: + for pattern in self.regex_paths: + matched = re.search(pattern, level.path()) is not None + if matched: + return True + if self.types: + for type_ in self.types: + if isinstance(level.t1, type_) and isinstance(level.t2, type_): + return True + return False + + def give_up_diffing(self, level, diff_instance) -> bool: + raise NotImplementedError('Please implement the diff function.') + + +class PrefixOrSuffixOperator: + + def match(self, level) -> bool: + return level.t1 and level.t2 and isinstance(level.t1, str) and isinstance(level.t2, str) + + def give_up_diffing(self, level, diff_instance) -> bool: + t1 = level.t1 + t2 = level.t2 + return t1.startswith(t2) or t2.startswith(t1) diff --git a/.venv/lib/python3.12/site-packages/deepdiff/path.py b/.venv/lib/python3.12/site-packages/deepdiff/path.py new file mode 100644 index 00000000..ee63b5b9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/path.py @@ -0,0 +1,316 @@ +import logging +from ast import literal_eval +from functools import lru_cache + +logger = logging.getLogger(__name__) + +GETATTR = 'GETATTR' +GET = 'GET' + + +class PathExtractionError(ValueError): + pass + + +class RootCanNotBeModified(ValueError): + pass + + +def _add_to_elements(elements, elem, inside): + # Ignore private items + if not elem: + return + if not elem.startswith('__'): + remove_quotes = False + if '𝆺𝅥𝅯' in elem or '\\' in elem: + remove_quotes = True + else: + try: + elem = literal_eval(elem) + remove_quotes = False + except (ValueError, SyntaxError): + remove_quotes = True + if remove_quotes and elem[0] == elem[-1] and elem[0] in {'"', "'"}: + elem = elem[1: -1] + action = GETATTR if inside == '.' else GET + elements.append((elem, action)) + + +DEFAULT_FIRST_ELEMENT = ('root', GETATTR) + + +@lru_cache(maxsize=1024 * 128) +def _path_to_elements(path, root_element=DEFAULT_FIRST_ELEMENT): + """ + Given a path, it extracts the elements that form the path and their relevant most likely retrieval action. + + >>> from deepdiff import _path_to_elements + >>> path = "root[4.3].b['a3']" + >>> _path_to_elements(path, root_element=None) + [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')] + """ + if isinstance(path, (tuple, list)): + return path + elements = [] + if root_element: + elements.append(root_element) + elem = '' + inside = False + prev_char = None + path = path[4:] # removing "root from the beginning" + brackets = [] + inside_quotes = False + quote_used = '' + for char in path: + if prev_char == '𝆺𝅥𝅯': + elem += char + elif char in {'"', "'"}: + elem += char + # If we are inside and the quote is not what we expected, the quote is not closing + if not(inside_quotes and quote_used != char): + inside_quotes = not inside_quotes + if inside_quotes: + quote_used = char + else: + _add_to_elements(elements, elem, inside) + elem = '' + quote_used = '' + elif inside_quotes: + elem += char + elif char == '[': + if inside == '.': + _add_to_elements(elements, elem, inside) + inside = '[' + elem = '' + # we are already inside. The bracket is a part of the word. + elif inside == '[': + elem += char + else: + inside = '[' + brackets.append('[') + elem = '' + elif char == '.': + if inside == '[': + elem += char + elif inside == '.': + _add_to_elements(elements, elem, inside) + elem = '' + else: + inside = '.' + elem = '' + elif char == ']': + if brackets and brackets[-1] == '[': + brackets.pop() + if brackets: + elem += char + else: + _add_to_elements(elements, elem, inside) + elem = '' + inside = False + else: + elem += char + prev_char = char + if elem: + _add_to_elements(elements, elem, inside) + return tuple(elements) + + +def _get_nested_obj(obj, elements, next_element=None): + for (elem, action) in elements: + if action == GET: + obj = obj[elem] + elif action == GETATTR: + obj = getattr(obj, elem) + return obj + + +def _guess_type(elements, elem, index, next_element): + # If we are not at the last elements + if index < len(elements) - 1: + # We assume it is a nested dictionary not a nested list + return {} + if isinstance(next_element, int): + return [] + return {} + + +def _get_nested_obj_and_force(obj, elements, next_element=None): + prev_elem = None + prev_action = None + prev_obj = obj + for index, (elem, action) in enumerate(elements): + _prev_obj = obj + if action == GET: + try: + obj = obj[elem] + prev_obj = _prev_obj + except KeyError: + obj[elem] = _guess_type(elements, elem, index, next_element) + obj = obj[elem] + prev_obj = _prev_obj + except IndexError: + if isinstance(obj, list) and isinstance(elem, int) and elem >= len(obj): + obj.extend([None] * (elem - len(obj))) + obj.append(_guess_type(elements, elem, index), next_element) + obj = obj[-1] + prev_obj = _prev_obj + elif isinstance(obj, list) and len(obj) == 0 and prev_elem: + # We ran into an empty list that should have been a dictionary + # We need to change it from an empty list to a dictionary + obj = {elem: _guess_type(elements, elem, index, next_element)} + if prev_action == GET: + prev_obj[prev_elem] = obj + else: + setattr(prev_obj, prev_elem, obj) + obj = obj[elem] + elif action == GETATTR: + obj = getattr(obj, elem) + prev_obj = _prev_obj + prev_elem = elem + prev_action = action + return obj + + +def extract(obj, path): + """ + Get the item from obj based on path. + + Example: + + >>> from deepdiff import extract + >>> obj = {1: [{'2': 'b'}, 3], 2: [4, 5]} + >>> path = "root[1][0]['2']" + >>> extract(obj, path) + 'b' + + Note that you can use extract in conjunction with DeepDiff results + or even with the search and :ref:`deepsearch_label` modules. For example: + + >>> from deepdiff import grep + >>> obj = {1: [{'2': 'b'}, 3], 2: [4, 5]} + >>> result = obj | grep(5) + >>> result + {'matched_values': ['root[2][1]']} + >>> result['matched_values'][0] + 'root[2][1]' + >>> path = result['matched_values'][0] + >>> extract(obj, path) + 5 + + + .. note:: + Note that even if DeepDiff tried gives you a path to an item in a set, + there is no such thing in Python and hence you will get an error trying + to extract that item from a set. + If you want to be able to get items from sets, use the SetOrdered module + to generate the sets. + In fact Deepdiff uses SetOrdered as a dependency. + + >>> from deepdiff import grep, extract + >>> obj = {"a", "b"} + >>> obj | grep("b") + Set item detected in the path.'set' objects do NOT support indexing. But DeepSearch will still report a path. + {'matched_values': SetOrdered(['root[0]'])} + >>> extract(obj, 'root[0]') + Traceback (most recent call last): + File "<stdin>", line 1, in <module> + File "deepdiff/deepdiff/path.py", line 126, in extract + return _get_nested_obj(obj, elements) + File "deepdiff/deepdiff/path.py", line 84, in _get_nested_obj + obj = obj[elem] + TypeError: 'set' object is not subscriptable + >>> from orderly_set import SetOrdered + >>> obj = SetOrdered(["a", "b"]) + >>> extract(obj, 'root[0]') + 'a' + + """ + elements = _path_to_elements(path, root_element=None) + return _get_nested_obj(obj, elements) + + +def parse_path(path, root_element=DEFAULT_FIRST_ELEMENT, include_actions=False): + """ + Parse a path to a format that is machine readable + + **Parameters** + + path : A string + The path string such as "root[1][2]['age']" + + root_element: string, default='root' + What the root is called in the path. + + include_actions: boolean, default=False + If True, we return the action required to retrieve the item at each element of the path. + + **Examples** + + >>> from deepdiff import parse_path + >>> parse_path("root[1][2]['age']") + [1, 2, 'age'] + >>> parse_path("root[1][2]['age']", include_actions=True) + [{'element': 1, 'action': 'GET'}, {'element': 2, 'action': 'GET'}, {'element': 'age', 'action': 'GET'}] + >>> + >>> parse_path("root['joe'].age") + ['joe', 'age'] + >>> parse_path("root['joe'].age", include_actions=True) + [{'element': 'joe', 'action': 'GET'}, {'element': 'age', 'action': 'GETATTR'}] + + """ + + result = _path_to_elements(path, root_element=root_element) + result = iter(result) + if root_element: + next(result) # We don't want the root item + if include_actions is False: + return [i[0] for i in result] + return [{'element': i[0], 'action': i[1]} for i in result] + + +def stringify_element(param, quote_str=None): + has_quote = "'" in param + has_double_quote = '"' in param + if has_quote and has_double_quote and not quote_str: + new_param = [] + for char in param: + if char in {'"', "'"}: + new_param.append('𝆺𝅥𝅯') + new_param.append(char) + result = '"' + ''.join(new_param) + '"' + elif has_quote: + result = f'"{param}"' + elif has_double_quote: + result = f"'{param}'" + else: + result = param if quote_str is None else quote_str.format(param) + return result + + +def stringify_path(path, root_element=DEFAULT_FIRST_ELEMENT, quote_str="'{}'"): + """ + Gets the path as an string. + + For example [1, 2, 'age'] should become + root[1][2]['age'] + """ + if not path: + return root_element[0] + result = [root_element[0]] + has_actions = False + try: + if path[0][1] in {GET, GETATTR}: + has_actions = True + except (KeyError, IndexError, TypeError): + pass + if not has_actions: + path = [(i, GET) for i in path] + path[0] = (path[0][0], root_element[1]) # The action for the first element might be a GET or GETATTR. We update the action based on the root_element. + for element, action in path: + if isinstance(element, str) and action == GET: + element = stringify_element(element, quote_str) + if action == GET: + result.append(f"[{element}]") + else: + result.append(f".{element}") + return ''.join(result) diff --git a/.venv/lib/python3.12/site-packages/deepdiff/py.typed b/.venv/lib/python3.12/site-packages/deepdiff/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/py.typed diff --git a/.venv/lib/python3.12/site-packages/deepdiff/search.py b/.venv/lib/python3.12/site-packages/deepdiff/search.py new file mode 100644 index 00000000..007c566c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/search.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python +import re +from collections.abc import MutableMapping, Iterable +from deepdiff.helper import SetOrdered +import logging + +from deepdiff.helper import ( + strings, numbers, add_to_frozen_set, get_doc, dict_, RE_COMPILED_TYPE, ipranges +) + +logger = logging.getLogger(__name__) + + +doc = get_doc('search_doc.rst') + + +class DeepSearch(dict): + r""" + **DeepSearch** + + Deep Search inside objects to find the item matching your criteria. + + **Parameters** + + obj : The object to search within + + item : The item to search for + + verbose_level : int >= 0, default = 1. + Verbose level one shows the paths of found items. + Verbose level 2 shows the path and value of the found items. + + exclude_paths: list, default = None. + List of paths to exclude from the report. + + exclude_types: list, default = None. + List of object types to exclude from the report. + + case_sensitive: Boolean, default = False + + match_string: Boolean, default = False + If True, the value of the object or its children have to exactly match the item. + If False, the value of the item can be a part of the value of the object or its children + + use_regexp: Boolean, default = False + + strict_checking: Boolean, default = True + If True, it will check the type of the object to match, so when searching for '1234', + it will NOT match the int 1234. Currently this only affects the numeric values searching. + + **Returns** + + A DeepSearch object that has the matched paths and matched values. + + **Supported data types** + + int, string, unicode, dictionary, list, tuple, set, frozenset, OrderedDict, NamedTuple and custom objects! + + **Examples** + + Importing + >>> from deepdiff import DeepSearch + >>> from pprint import pprint + + Search in list for string + >>> obj = ["long somewhere", "string", 0, "somewhere great!"] + >>> item = "somewhere" + >>> ds = DeepSearch(obj, item, verbose_level=2) + >>> print(ds) + {'matched_values': {'root[3]': 'somewhere great!', 'root[0]': 'long somewhere'}} + + Search in nested data for string + >>> obj = ["something somewhere", {"long": "somewhere", "string": 2, 0: 0, "somewhere": "around"}] + >>> item = "somewhere" + >>> ds = DeepSearch(obj, item, verbose_level=2) + >>> pprint(ds, indent=2) + { 'matched_paths': {"root[1]['somewhere']": 'around'}, + 'matched_values': { 'root[0]': 'something somewhere', + "root[1]['long']": 'somewhere'}} + + """ + + warning_num = 0 + + def __init__(self, + obj, + item, + exclude_paths=SetOrdered(), + exclude_regex_paths=SetOrdered(), + exclude_types=SetOrdered(), + verbose_level=1, + case_sensitive=False, + match_string=False, + use_regexp=False, + strict_checking=True, + **kwargs): + if kwargs: + raise ValueError(( + "The following parameter(s) are not valid: %s\n" + "The valid parameters are obj, item, exclude_paths, exclude_types,\n" + "case_sensitive, match_string and verbose_level." + ) % ', '.join(kwargs.keys())) + + self.obj = obj + self.case_sensitive = case_sensitive if isinstance(item, strings) else True + item = item if self.case_sensitive else item.lower() + self.exclude_paths = SetOrdered(exclude_paths) + self.exclude_regex_paths = [re.compile(exclude_regex_path) for exclude_regex_path in exclude_regex_paths] + self.exclude_types = SetOrdered(exclude_types) + self.exclude_types_tuple = tuple( + exclude_types) # we need tuple for checking isinstance + self.verbose_level = verbose_level + self.update( + matched_paths=self.__set_or_dict(), + matched_values=self.__set_or_dict(), + unprocessed=[]) + self.use_regexp = use_regexp + if not strict_checking and (isinstance(item, numbers) or isinstance(item, ipranges)): + item = str(item) + if self.use_regexp: + try: + item = re.compile(item) + except TypeError as e: + raise TypeError(f"The passed item of {item} is not usable for regex: {e}") from None + self.strict_checking = strict_checking + + # Cases where user wants to match exact string item + self.match_string = match_string + + self.__search(obj, item, parents_ids=frozenset({id(obj)})) + + empty_keys = [k for k, v in self.items() if not v] + + for k in empty_keys: + del self[k] + + def __set_or_dict(self): + return dict_() if self.verbose_level >= 2 else SetOrdered() + + def __report(self, report_key, key, value): + if self.verbose_level >= 2: + self[report_key][key] = value + else: + self[report_key].add(key) + + def __search_obj(self, + obj, + item, + parent, + parents_ids=frozenset(), + is_namedtuple=False): + """Search objects""" + found = False + if obj == item: + found = True + # We report the match but also continue inside the match to see if there are + # further matches inside the `looped` object. + self.__report(report_key='matched_values', key=parent, value=obj) + + try: + if is_namedtuple: + obj = obj._asdict() + else: + # Skip magic methods. Slightly hacky, but unless people are defining + # new magic methods they want to search, it should work fine. + obj = {i: getattr(obj, i) for i in dir(obj) + if not (i.startswith('__') and i.endswith('__'))} + except AttributeError: + try: + obj = {i: getattr(obj, i) for i in obj.__slots__} + except AttributeError: + if not found: + self['unprocessed'].append("%s" % parent) + + return + + self.__search_dict( + obj, item, parent, parents_ids, print_as_attribute=True) + + def __skip_this(self, item, parent): + skip = False + if parent in self.exclude_paths: + skip = True + elif self.exclude_regex_paths and any( + [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]): + skip = True + else: + if isinstance(item, self.exclude_types_tuple): + skip = True + + return skip + + def __search_dict(self, + obj, + item, + parent, + parents_ids=frozenset(), + print_as_attribute=False): + """Search dictionaries""" + if print_as_attribute: + parent_text = "%s.%s" + else: + parent_text = "%s[%s]" + + obj_keys = SetOrdered(obj.keys()) + + for item_key in obj_keys: + if not print_as_attribute and isinstance(item_key, strings): + item_key_str = "'%s'" % item_key + else: + item_key_str = item_key + + obj_child = obj[item_key] + + item_id = id(obj_child) + + if parents_ids and item_id in parents_ids: + continue + + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + + new_parent = parent_text % (parent, item_key_str) + new_parent_cased = new_parent if self.case_sensitive else new_parent.lower() + + str_item = str(item) + if (self.match_string and str_item == new_parent_cased) or\ + (not self.match_string and str_item in new_parent_cased) or\ + (self.use_regexp and item.search(new_parent_cased)): + self.__report( + report_key='matched_paths', + key=new_parent, + value=obj_child) + + self.__search( + obj_child, + item, + parent=new_parent, + parents_ids=parents_ids_added) + + def __search_iterable(self, + obj, + item, + parent="root", + parents_ids=frozenset()): + """Search iterables except dictionaries, sets and strings.""" + for i, thing in enumerate(obj): + new_parent = "{}[{}]".format(parent, i) + if self.__skip_this(thing, parent=new_parent): + continue + + if self.case_sensitive or not isinstance(thing, strings): + thing_cased = thing + else: + thing_cased = thing.lower() + + if not self.use_regexp and thing_cased == item: + self.__report( + report_key='matched_values', key=new_parent, value=thing) + else: + item_id = id(thing) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + self.__search(thing, item, "%s[%s]" % + (parent, i), parents_ids_added) + + def __search_str(self, obj, item, parent): + """Compare strings""" + obj_text = obj if self.case_sensitive else obj.lower() + + is_matched = False + if self.use_regexp: + is_matched = item.search(obj_text) + elif (self.match_string and item == obj_text) or (not self.match_string and item in obj_text): + is_matched = True + if is_matched: + self.__report(report_key='matched_values', key=parent, value=obj) + + def __search_numbers(self, obj, item, parent): + if ( + item == obj or ( + not self.strict_checking and ( + item == str(obj) or ( + self.use_regexp and item.search(str(obj)) + ) + ) + ) + ): + self.__report(report_key='matched_values', key=parent, value=obj) + + def __search_tuple(self, obj, item, parent, parents_ids): + # Checking to see if it has _fields. Which probably means it is a named + # tuple. + try: + obj._asdict + # It must be a normal tuple + except AttributeError: + self.__search_iterable(obj, item, parent, parents_ids) + # We assume it is a namedtuple then + else: + self.__search_obj( + obj, item, parent, parents_ids, is_namedtuple=True) + + def __search(self, obj, item, parent="root", parents_ids=frozenset()): + """The main search method""" + if self.__skip_this(item, parent): + return + + elif isinstance(obj, strings) and isinstance(item, (strings, RE_COMPILED_TYPE)): + self.__search_str(obj, item, parent) + + elif isinstance(obj, strings) and isinstance(item, numbers): + return + + elif isinstance(obj, ipranges): + self.__search_str(str(obj), item, parent) + + elif isinstance(obj, numbers): + self.__search_numbers(obj, item, parent) + + elif isinstance(obj, MutableMapping): + self.__search_dict(obj, item, parent, parents_ids) + + elif isinstance(obj, tuple): + self.__search_tuple(obj, item, parent, parents_ids) + + elif isinstance(obj, (set, frozenset)): + if self.warning_num < 10: + logger.warning( + "Set item detected in the path." + "'set' objects do NOT support indexing. But DeepSearch will still report a path." + ) + self.warning_num += 1 + self.__search_iterable(obj, item, parent, parents_ids) + + elif isinstance(obj, Iterable) and not isinstance(obj, strings): + self.__search_iterable(obj, item, parent, parents_ids) + + else: + self.__search_obj(obj, item, parent, parents_ids) + + +class grep: + __doc__ = doc + + def __init__(self, + item, + **kwargs): + self.item = item + self.kwargs = kwargs + + def __ror__(self, other): + return DeepSearch(obj=other, item=self.item, **self.kwargs) + + +if __name__ == "__main__": # pragma: no cover + import doctest + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/deepdiff/serialization.py b/.venv/lib/python3.12/site-packages/deepdiff/serialization.py new file mode 100644 index 00000000..c148aadf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/serialization.py @@ -0,0 +1,730 @@ +import pickle +import sys +import io +import os +import json +import uuid +import logging +import re # NOQA +import builtins # NOQA +import datetime # NOQA +import decimal # NOQA +import orderly_set # NOQA +import collections # NOQA +from copy import deepcopy, copy +from functools import partial +from collections.abc import Mapping +from typing import ( + Callable, Optional, Union, + overload, Literal, Any, +) +from deepdiff.helper import ( + strings, + get_type, + TEXT_VIEW, + np_float32, + np_float64, + np_int32, + np_int64, + np_ndarray, + Opcode, + SetOrdered, + pydantic_base_model_type, + PydanticBaseModel, + NotPresent, + ipranges, +) +from deepdiff.model import DeltaResult + +try: + import orjson +except ImportError: # pragma: no cover. + orjson = None + +logger = logging.getLogger(__name__) + +class UnsupportedFormatErr(TypeError): + pass + + +NONE_TYPE = type(None) + +CSV_HEADER_MAX_CHUNK_SIZE = 2048 # The chunk needs to be big enough that covers a couple of rows of data. + + +MODULE_NOT_FOUND_MSG = 'DeepDiff Delta did not find {} in your modules. Please make sure it is already imported.' +FORBIDDEN_MODULE_MSG = "Module '{}' is forbidden. You need to explicitly pass it by passing a safe_to_import parameter" +DELTA_IGNORE_ORDER_NEEDS_REPETITION_REPORT = 'report_repetition must be set to True when ignore_order is True to create the delta object.' +DELTA_ERROR_WHEN_GROUP_BY = 'Delta can not be made when group_by is used since the structure of data is modified from the original form.' + +SAFE_TO_IMPORT = { + 'builtins.range', + 'builtins.complex', + 'builtins.set', + 'builtins.frozenset', + 'builtins.slice', + 'builtins.str', + 'builtins.bytes', + 'builtins.list', + 'builtins.tuple', + 'builtins.int', + 'builtins.float', + 'builtins.dict', + 'builtins.bool', + 'builtins.bin', + 'builtins.None', + 'datetime.datetime', + 'datetime.time', + 'datetime.timedelta', + 'decimal.Decimal', + 'uuid.UUID', + 'orderly_set.sets.OrderedSet', + 'orderly_set.sets.OrderlySet', + 'orderly_set.sets.StableSetEq', + 'deepdiff.helper.SetOrdered', + 'collections.namedtuple', + 'collections.OrderedDict', + 're.Pattern', + 'deepdiff.helper.Opcode', +} + + +TYPE_STR_TO_TYPE = { + 'range': range, + 'complex': complex, + 'set': set, + 'frozenset': frozenset, + 'slice': slice, + 'str': str, + 'bytes': bytes, + 'list': list, + 'tuple': tuple, + 'int': int, + 'float': float, + 'dict': dict, + 'bool': bool, + 'bin': bin, + 'None': None, + 'NoneType': None, + 'datetime': datetime.datetime, + 'time': datetime.time, + 'timedelta': datetime.timedelta, + 'Decimal': decimal.Decimal, + 'SetOrdered': SetOrdered, + 'namedtuple': collections.namedtuple, + 'OrderedDict': collections.OrderedDict, + 'Pattern': re.Pattern, + 'iprange': str, +} + + +class ModuleNotFoundError(ImportError): + """ + Raised when the module is not found in sys.modules + """ + pass + + +class ForbiddenModule(ImportError): + """ + Raised when a module is not explicitly allowed to be imported + """ + pass + + +class SerializationMixin: + + def to_json_pickle(self): + """ + :ref:`to_json_pickle_label` + Get the json pickle of the diff object. Unless you need all the attributes and functionality of DeepDiff, running to_json() is the safer option that json pickle. + """ + try: + import jsonpickle + copied = self.copy() # type: ignore + return jsonpickle.encode(copied) + except ImportError: # pragma: no cover. Json pickle is getting deprecated. + logger.error('jsonpickle library needs to be installed in order to run to_json_pickle') # pragma: no cover. Json pickle is getting deprecated. + + @classmethod + def from_json_pickle(cls, value): + """ + :ref:`from_json_pickle_label` + Load DeepDiff object with all the bells and whistles from the json pickle dump. + Note that json pickle dump comes from to_json_pickle + """ + try: + import jsonpickle + return jsonpickle.decode(value) + except ImportError: # pragma: no cover. Json pickle is getting deprecated. + logger.error('jsonpickle library needs to be installed in order to run from_json_pickle') # pragma: no cover. Json pickle is getting deprecated. + + def to_json(self, default_mapping: Optional[dict]=None, force_use_builtin_json=False, **kwargs): + """ + Dump json of the text view. + **Parameters** + + default_mapping : dictionary(optional), a dictionary of mapping of different types to json types. + + by default DeepDiff converts certain data types. For example Decimals into floats so they can be exported into json. + If you have a certain object type that the json serializer can not serialize it, please pass the appropriate type + conversion through this dictionary. + + force_use_builtin_json: Boolean, default = False + When True, we use Python's builtin Json library for serialization, + even if Orjson is installed. + + + kwargs: Any other kwargs you pass will be passed on to Python's json.dumps() + + **Example** + + Serialize custom objects + >>> class A: + ... pass + ... + >>> class B: + ... pass + ... + >>> t1 = A() + >>> t2 = B() + >>> ddiff = DeepDiff(t1, t2) + >>> ddiff.to_json() + TypeError: We do not know how to convert <__main__.A object at 0x10648> of type <class '__main__.A'> for json serialization. Please pass the default_mapping parameter with proper mapping of the object to a basic python type. + + >>> default_mapping = {A: lambda x: 'obj A', B: lambda x: 'obj B'} + >>> ddiff.to_json(default_mapping=default_mapping) + '{"type_changes": {"root": {"old_type": "A", "new_type": "B", "old_value": "obj A", "new_value": "obj B"}}}' + """ + dic = self.to_dict(view_override=TEXT_VIEW) + return json_dumps( + dic, + default_mapping=default_mapping, + force_use_builtin_json=force_use_builtin_json, + **kwargs, + ) + + def to_dict(self, view_override: Optional[str]=None) -> dict: + """ + convert the result to a python dictionary. You can override the view type by passing view_override. + + **Parameters** + + view_override: view type, default=None, + override the view that was used to generate the diff when converting to the dictionary. + The options are the text or tree. + """ + + view = view_override if view_override else self.view # type: ignore + return dict(self._get_view_results(view)) # type: ignore + + def _to_delta_dict( + self, + directed: bool = True, + report_repetition_required: bool = True, + always_include_values: bool = False, + ) -> dict: + """ + Dump to a dictionary suitable for delta usage. + Unlike to_dict, this is not dependent on the original view that the user chose to create the diff. + + **Parameters** + + directed : Boolean, default=True, whether to create a directional delta dictionary or a symmetrical + + Note that in the current implementation the symmetrical delta (non-directional) is ONLY used for verifying that + the delta is being applied to the exact same values as what was used to generate the delta and has + no other usages. + + If this option is set as True, then the dictionary will not have the "old_value" in the output. + Otherwise it will have the "old_value". "old_value" is the value of the item in t1. + + If delta = Delta(DeepDiff(t1, t2)) then + t1 + delta == t2 + + Note that it the items in t1 + delta might have slightly different order of items than t2 if ignore_order + was set to be True in the diff object. + + """ + if self.group_by is not None: # type: ignore + raise ValueError(DELTA_ERROR_WHEN_GROUP_BY) + + if directed and not always_include_values: + _iterable_opcodes = {} # type: ignore + for path, op_codes in self._iterable_opcodes.items(): # type: ignore + _iterable_opcodes[path] = [] + for op_code in op_codes: + new_op_code = Opcode( + tag=op_code.tag, + t1_from_index=op_code.t1_from_index, + t1_to_index=op_code.t1_to_index, + t2_from_index=op_code.t2_from_index, + t2_to_index=op_code.t2_to_index, + new_values=op_code.new_values, + ) + _iterable_opcodes[path].append(new_op_code) + else: + _iterable_opcodes = self._iterable_opcodes # type: ignore + + result = DeltaResult( + tree_results=self.tree, # type: ignore + ignore_order=self.ignore_order, # type: ignore + always_include_values=always_include_values, + _iterable_opcodes=_iterable_opcodes, + ) + result.remove_empty_keys() + if report_repetition_required and self.ignore_order and not self.report_repetition: # type: ignore + raise ValueError(DELTA_IGNORE_ORDER_NEEDS_REPETITION_REPORT) + if directed: + for report_key, report_value in result.items(): + if isinstance(report_value, Mapping): + for path, value in report_value.items(): + if isinstance(value, Mapping) and 'old_value' in value: + del value['old_value'] # type: ignore + if self._numpy_paths: # type: ignore + # Note that keys that start with '_' are considered internal to DeepDiff + # and will be omitted when counting distance. (Look inside the distance module.) + result['_numpy_paths'] = self._numpy_paths # type: ignore + + if self.iterable_compare_func: # type: ignore + result['_iterable_compare_func_was_used'] = True + + return deepcopy(dict(result)) + + def pretty(self, prefix: Optional[Union[str, Callable]]=None): + """ + The pretty human readable string output for the diff object + regardless of what view was used to generate the diff. + + prefix can be a callable or a string or None. + + Example: + >>> t1={1,2,4} + >>> t2={2,3} + >>> print(DeepDiff(t1, t2).pretty()) + Item root[3] added to set. + Item root[4] removed from set. + Item root[1] removed from set. + """ + result = [] + if prefix is None: + prefix = '' + keys = sorted(self.tree.keys()) # type: ignore # sorting keys to guarantee constant order across python versions. + for key in keys: + for item_key in self.tree[key]: # type: ignore + result += [pretty_print_diff(item_key)] + + if callable(prefix): + return "\n".join(f"{prefix(diff=self)}{r}" for r in result) + return "\n".join(f"{prefix}{r}" for r in result) + + +class _RestrictedUnpickler(pickle.Unpickler): + + def __init__(self, *args, **kwargs): + self.safe_to_import = kwargs.pop('safe_to_import', None) + if self.safe_to_import: + if isinstance(self.safe_to_import, strings): + self.safe_to_import = set([self.safe_to_import]) + elif isinstance(self.safe_to_import, (set, frozenset)): + pass + else: + self.safe_to_import = set(self.safe_to_import) + self.safe_to_import = self.safe_to_import | SAFE_TO_IMPORT + else: + self.safe_to_import = SAFE_TO_IMPORT + super().__init__(*args, **kwargs) + + def find_class(self, module, name): + # Only allow safe classes from self.safe_to_import. + module_dot_class = '{}.{}'.format(module, name) + if module_dot_class in self.safe_to_import: + try: + module_obj = sys.modules[module] + except KeyError: + raise ModuleNotFoundError(MODULE_NOT_FOUND_MSG.format(module_dot_class)) from None + return getattr(module_obj, name) + # Forbid everything else. + raise ForbiddenModule(FORBIDDEN_MODULE_MSG.format(module_dot_class)) from None + + def persistent_load(self, pid): + if pid == "<<NoneType>>": + return type(None) + + +class _RestrictedPickler(pickle.Pickler): + def persistent_id(self, obj): + if obj is NONE_TYPE: # NOQA + return "<<NoneType>>" + return None + + +def pickle_dump(obj, file_obj=None, protocol=4): + """ + **pickle_dump** + Dumps the obj into pickled content. + + **Parameters** + + obj : Any python object + + file_obj : (Optional) A file object to dump the contents into + + **Returns** + + If file_obj is passed the return value will be None. It will write the object's pickle contents into the file. + However if no file_obj is passed, then it will return the pickle serialization of the obj in the form of bytes. + """ + file_obj_passed = bool(file_obj) + file_obj = file_obj or io.BytesIO() + _RestrictedPickler(file_obj, protocol=protocol, fix_imports=False).dump(obj) + if not file_obj_passed: + return file_obj.getvalue() + + +def pickle_load(content=None, file_obj=None, safe_to_import=None): + """ + **pickle_load** + Load the pickled content. content should be a bytes object. + + **Parameters** + + content : Bytes of pickled object. + + file_obj : A file object to load the content from + + safe_to_import : A set of modules that needs to be explicitly allowed to be loaded. + Example: {'mymodule.MyClass', 'decimal.Decimal'} + Note that this set will be added to the basic set of modules that are already allowed. + The set of what is already allowed can be found in deepdiff.serialization.SAFE_TO_IMPORT + + **Returns** + + A delta object that can be added to t1 to recreate t2. + + **Examples** + + Importing + >>> from deepdiff import DeepDiff, Delta + >>> from pprint import pprint + + + """ + if not content and not file_obj: + raise ValueError('Please either pass the content or the file_obj to pickle_load.') + if isinstance(content, str): + content = content.encode('utf-8') + if content: + file_obj = io.BytesIO(content) + return _RestrictedUnpickler(file_obj, safe_to_import=safe_to_import).load() + + +def _get_pretty_form_text(verbose_level): + pretty_form_texts = { + "type_changes": "Type of {diff_path} changed from {type_t1} to {type_t2} and value changed from {val_t1} to {val_t2}.", + "values_changed": "Value of {diff_path} changed from {val_t1} to {val_t2}.", + "dictionary_item_added": "Item {diff_path} added to dictionary.", + "dictionary_item_removed": "Item {diff_path} removed from dictionary.", + "iterable_item_added": "Item {diff_path} added to iterable.", + "iterable_item_removed": "Item {diff_path} removed from iterable.", + "attribute_added": "Attribute {diff_path} added.", + "attribute_removed": "Attribute {diff_path} removed.", + "set_item_added": "Item root[{val_t2}] added to set.", + "set_item_removed": "Item root[{val_t1}] removed from set.", + "repetition_change": "Repetition change for item {diff_path}.", + } + if verbose_level == 2: + pretty_form_texts.update( + { + "dictionary_item_added": "Item {diff_path} ({val_t2}) added to dictionary.", + "dictionary_item_removed": "Item {diff_path} ({val_t1}) removed from dictionary.", + "iterable_item_added": "Item {diff_path} ({val_t2}) added to iterable.", + "iterable_item_removed": "Item {diff_path} ({val_t1}) removed from iterable.", + "attribute_added": "Attribute {diff_path} ({val_t2}) added.", + "attribute_removed": "Attribute {diff_path} ({val_t1}) removed.", + } + ) + return pretty_form_texts + + +def pretty_print_diff(diff): + type_t1 = get_type(diff.t1).__name__ + type_t2 = get_type(diff.t2).__name__ + + val_t1 = '"{}"'.format(str(diff.t1)) if type_t1 == "str" else str(diff.t1) + val_t2 = '"{}"'.format(str(diff.t2)) if type_t2 == "str" else str(diff.t2) + + diff_path = diff.path(root='root') + return _get_pretty_form_text(diff.verbose_level).get(diff.report_type, "").format( + diff_path=diff_path, + type_t1=type_t1, + type_t2=type_t2, + val_t1=val_t1, + val_t2=val_t2) + + +def load_path_content(path, file_type=None): + """ + Loads and deserializes the content of the path. + """ + + if file_type is None: + file_type = path.split('.')[-1] + if file_type == 'json': + with open(path, 'r') as the_file: + content = json_loads(the_file.read()) + elif file_type in {'yaml', 'yml'}: + try: + import yaml + except ImportError: # pragma: no cover. + raise ImportError('Pyyaml needs to be installed.') from None # pragma: no cover. + with open(path, 'r') as the_file: + content = yaml.safe_load(the_file) + elif file_type == 'toml': + try: + if sys.version_info >= (3, 11): + import tomllib as tomli + else: + import tomli + except ImportError: # pragma: no cover. + raise ImportError('On python<=3.10 tomli needs to be installed.') from None # pragma: no cover. + with open(path, 'rb') as the_file: + content = tomli.load(the_file) + elif file_type == 'pickle': + with open(path, 'rb') as the_file: + content = the_file.read() + content = pickle_load(content) + elif file_type in {'csv', 'tsv'}: + try: + import clevercsv # type: ignore + content = clevercsv.read_dicts(path) + except ImportError: # pragma: no cover. + import csv + with open(path, 'r') as the_file: + content = list(csv.DictReader(the_file)) + + logger.info(f"NOTE: CSV content was empty in {path}") + + # Everything in csv is string but we try to automatically convert any numbers we find + for row in content: + for key, value in row.items(): + value = value.strip() + for type_ in [int, float, complex]: + try: + value = type_(value) + except Exception: + pass + else: + row[key] = value + break + else: + raise UnsupportedFormatErr(f'Only json, yaml, toml, csv, tsv and pickle are supported.\n' + f' The {file_type} extension is not known.') + return content + + +def save_content_to_path(content, path, file_type=None, keep_backup=True): + """ + Saves and serializes the content of the path. + """ + + backup_path = f"{path}.bak" + os.rename(path, backup_path) + + try: + _save_content( + content=content, path=path, + file_type=file_type, keep_backup=keep_backup) + except Exception: + os.rename(backup_path, path) + raise + else: + if not keep_backup: + os.remove(backup_path) + + +def _save_content(content, path, file_type, keep_backup=True): + if file_type == 'json': + with open(path, 'w') as the_file: + content = json_dumps(content) + the_file.write(content) # type: ignore + elif file_type in {'yaml', 'yml'}: + try: + import yaml + except ImportError: # pragma: no cover. + raise ImportError('Pyyaml needs to be installed.') from None # pragma: no cover. + with open(path, 'w') as the_file: + content = yaml.safe_dump(content, stream=the_file) + elif file_type == 'toml': + try: + import tomli_w + except ImportError: # pragma: no cover. + raise ImportError('Tomli-w needs to be installed.') from None # pragma: no cover. + with open(path, 'wb') as the_file: + content = tomli_w.dump(content, the_file) + elif file_type == 'pickle': + with open(path, 'wb') as the_file: + content = pickle_dump(content, file_obj=the_file) + elif file_type in {'csv', 'tsv'}: + try: + import clevercsv # type: ignore + dict_writer = clevercsv.DictWriter + except ImportError: # pragma: no cover. + import csv + dict_writer = csv.DictWriter + with open(path, 'w', newline='') as csvfile: + fieldnames = list(content[0].keys()) + writer = dict_writer(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(content) + else: + raise UnsupportedFormatErr('Only json, yaml, toml, csv, tsv and pickle are supported.\n' + f' The {file_type} extension is not known.') + return content + + +def _serialize_decimal(value): + if value.as_tuple().exponent == 0: + return int(value) + else: + return float(value) + + +def _serialize_tuple(value): + if hasattr(value, '_asdict'): # namedtuple + return value._asdict() + return value + + +JSON_CONVERTOR = { + decimal.Decimal: _serialize_decimal, + SetOrdered: list, + orderly_set.StableSetEq: list, + set: list, + type: lambda x: x.__name__, + bytes: lambda x: x.decode('utf-8'), + datetime.datetime: lambda x: x.isoformat(), + uuid.UUID: lambda x: str(x), + np_float32: float, + np_float64: float, + np_int32: int, + np_int64: int, + np_ndarray: lambda x: x.tolist(), + tuple: _serialize_tuple, + Mapping: dict, + NotPresent: str, +} + +if PydanticBaseModel is not pydantic_base_model_type: + JSON_CONVERTOR[PydanticBaseModel] = lambda x: x.dict() + + +def json_convertor_default(default_mapping=None): + if default_mapping: + _convertor_mapping = JSON_CONVERTOR.copy() + _convertor_mapping.update(default_mapping) + else: + _convertor_mapping = JSON_CONVERTOR + + def _convertor(obj): + for original_type, convert_to in _convertor_mapping.items(): + if isinstance(obj, original_type): + return convert_to(obj) + # This is to handle reverse() which creates a generator of type list_reverseiterator + if obj.__class__.__name__ == 'list_reverseiterator': + return list(copy(obj)) + raise TypeError('We do not know how to convert {} of type {} for json serialization. Please pass the default_mapping parameter with proper mapping of the object to a basic python type.'.format(obj, type(obj))) + + return _convertor + + +class JSONDecoder(json.JSONDecoder): + + def __init__(self, *args, **kwargs): + json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs) + + def object_hook(self, obj): # type: ignore + if 'old_type' in obj and 'new_type' in obj: + for type_key in ('old_type', 'new_type'): + type_str = obj[type_key] + obj[type_key] = TYPE_STR_TO_TYPE.get(type_str, type_str) + + return obj + + + +@overload +def json_dumps( + item: Any, + **kwargs, +) -> str: + ... + + +@overload +def json_dumps( + item: Any, + default_mapping:Optional[dict], + force_use_builtin_json: bool, + return_bytes:Literal[True], + **kwargs, +) -> bytes: + ... + + +@overload +def json_dumps( + item: Any, + default_mapping:Optional[dict], + force_use_builtin_json: bool, + return_bytes:Literal[False], + **kwargs, +) -> str: + ... + + +def json_dumps( + item: Any, + default_mapping:Optional[dict]=None, + force_use_builtin_json: bool = False, + return_bytes: bool = False, + **kwargs, +) -> Union[str, bytes]: + """ + Dump json with extra details that are not normally json serializable + + parameters + ---------- + + force_use_builtin_json: Boolean, default = False + When True, we use Python's builtin Json library for serialization, + even if Orjson is installed. + """ + if orjson and not force_use_builtin_json: + indent = kwargs.pop('indent', None) + kwargs['option'] = orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY + if indent: + kwargs['option'] |= orjson.OPT_INDENT_2 + if 'sort_keys' in kwargs: + raise TypeError( + "orjson does not accept the sort_keys parameter. " + "If you need to pass sort_keys, set force_use_builtin_json=True " + "to use Python's built-in json library instead of orjson.") + result = orjson.dumps( + item, + default=json_convertor_default(default_mapping=default_mapping), + **kwargs) + if return_bytes: + return result + return result.decode(encoding='utf-8') + else: + result = json.dumps( + item, + default=json_convertor_default(default_mapping=default_mapping), + **kwargs) + if return_bytes: + return result.encode(encoding='utf-8') + return result + + +json_loads = partial(json.loads, cls=JSONDecoder) diff --git a/.venv/lib/python3.12/site-packages/deepdiff/summarize.py b/.venv/lib/python3.12/site-packages/deepdiff/summarize.py new file mode 100644 index 00000000..f911b84c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/summarize.py @@ -0,0 +1,144 @@ +from typing import Tuple +from deepdiff.helper import JSON, SummaryNodeType +from deepdiff.serialization import json_dumps + + +def _truncate(s: str, max_len: int) -> str: + """ + Truncate string s to max_len characters. + If possible, keep the first (max_len-5) characters, then '...' then the last 2 characters. + """ + if len(s) <= max_len: + return s + if max_len <= 5: + return s[:max_len] + return s[:max_len - 5] + "..." + s[-2:] +# Re-defining the functions due to environment reset + + +# Function to calculate node weights recursively +def calculate_weights(node): + if isinstance(node, dict): + weight = 0 + children_weights = {} + for k, v in node.items(): + try: + edge_weight = len(k) + except TypeError: + edge_weight = 1 + child_weight, child_structure = calculate_weights(v) + total_weight = edge_weight + child_weight + weight += total_weight + children_weights[k] = (edge_weight, child_weight, child_structure) + return weight, (SummaryNodeType.dict, children_weights) + + elif isinstance(node, list): + weight = 0 + children_weights = [] + for v in node: + edge_weight = 0 # Index weights are zero + child_weight, child_structure = calculate_weights(v) + total_weight = edge_weight + child_weight + weight += total_weight + children_weights.append((edge_weight, child_weight, child_structure)) + return weight, (SummaryNodeType.list, children_weights) + + else: + if isinstance(node, str): + node_weight = len(node) + elif isinstance(node, int): + node_weight = len(str(node)) + elif isinstance(node, float): + node_weight = len(str(round(node, 2))) + elif node is None: + node_weight = 1 + else: + node_weight = 0 + return node_weight, (SummaryNodeType.leaf, node) + +# Include previously defined functions for shrinking with threshold +# (Implementing directly the balanced summarization algorithm as above) + +# Balanced algorithm (simplified version): +def shrink_tree_balanced(node_structure, max_weight: int, balance_threshold: float) -> Tuple[JSON, float]: + node_type, node_info = node_structure + + if node_type is SummaryNodeType.leaf: + leaf_value = node_info + leaf_weight, _ = calculate_weights(leaf_value) + if leaf_weight <= max_weight: + return leaf_value, leaf_weight + else: + if isinstance(leaf_value, str): + truncated_value = _truncate(leaf_value, max_weight) + return truncated_value, len(truncated_value) + elif isinstance(leaf_value, (int, float)): + leaf_str = str(leaf_value) + truncated_str = leaf_str[:max_weight] + try: + return int(truncated_str), len(truncated_str) + except Exception: + try: + return float(truncated_str), len(truncated_str) + except Exception: + return truncated_str, len(truncated_str) + elif leaf_value is None: + return None, 1 if max_weight >= 1 else 0 + + elif node_type is SummaryNodeType.dict: + shrunk_dict = {} + total_weight = 0 + sorted_children = sorted(node_info.items(), key=lambda x: x[1][0] + x[1][1], reverse=True) + + for k, (edge_w, _, child_struct) in sorted_children: + allowed_branch_weight = min(max_weight * balance_threshold, max_weight - total_weight) + if allowed_branch_weight <= edge_w: + continue + + remaining_weight = int(allowed_branch_weight - edge_w) + shrunk_child, shrunk_weight = shrink_tree_balanced(child_struct, remaining_weight, balance_threshold) + if shrunk_child is not None: + shrunk_dict[k[:edge_w]] = shrunk_child + total_weight += edge_w + shrunk_weight + + if total_weight >= max_weight: + break + if not shrunk_dict: + return None, 0 + + return shrunk_dict, total_weight + + elif node_type is SummaryNodeType.list: + shrunk_list = [] + total_weight = 0 + sorted_children = sorted(node_info, key=lambda x: x[0] + x[1], reverse=True) + for edge_w, _, child_struct in sorted_children: + allowed_branch_weight = int(min(max_weight * balance_threshold, max_weight - total_weight)) + shrunk_child, shrunk_weight = shrink_tree_balanced(child_struct, allowed_branch_weight, balance_threshold) + if shrunk_child is not None: + shrunk_list.append(shrunk_child) + total_weight += shrunk_weight + if total_weight >= max_weight - 1: + shrunk_list.append("...") + break + if not shrunk_list: + return None, 0 + return shrunk_list, total_weight + return None, 0 + + +def greedy_tree_summarization_balanced(json_data: JSON, max_weight: int, balance_threshold=0.6) -> JSON: + total_weight, tree_structure = calculate_weights(json_data) + if total_weight <= max_weight: + return json_data + shrunk_tree, _ = shrink_tree_balanced(tree_structure, max_weight, balance_threshold) + return shrunk_tree + + +def summarize(data: JSON, max_length:int=200, balance_threshold:float=0.6) -> str: + try: + return json_dumps( + greedy_tree_summarization_balanced(data, max_length, balance_threshold) + ) + except Exception: + return str(data) |