aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/deepdiff
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff')
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/__init__.py14
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/anyset.py65
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/base.py51
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/commands.py232
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/deephash.py627
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/delta.py1217
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/diff.py1906
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/distance.py342
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/helper.py837
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/lfucache.py217
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/model.py974
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/operator.py69
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/path.py316
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/py.typed0
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/search.py358
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/serialization.py730
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/summarize.py144
17 files changed, 8099 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/__init__.py b/.venv/lib/python3.12/site-packages/deepdiff/__init__.py
new file mode 100644
index 00000000..c784c558
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/__init__.py
@@ -0,0 +1,14 @@
+"""This module offers the DeepDiff, DeepSearch, grep, Delta and DeepHash classes."""
+# flake8: noqa
+__version__ = '8.4.2'
+import logging
+
+if __name__ == '__main__':
+ logging.basicConfig(format='%(asctime)s %(levelname)8s %(message)s')
+
+
+from .diff import DeepDiff as DeepDiff
+from .search import DeepSearch as DeepSearch, grep as grep
+from .deephash import DeepHash as DeepHash
+from .delta import Delta as Delta
+from .path import extract as extract, parse_path as parse_path
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/anyset.py b/.venv/lib/python3.12/site-packages/deepdiff/anyset.py
new file mode 100644
index 00000000..cd87ac38
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/anyset.py
@@ -0,0 +1,65 @@
+from deepdiff.deephash import DeepHash
+from deepdiff.helper import dict_, SetOrdered
+
+
+class AnySet:
+ """
+ Any object can be in this set whether hashable or not.
+ Note that the current implementation has memory leak and keeps
+ traces of objects in itself even after popping.
+ However one the AnySet object is deleted, all those traces will be gone too.
+ """
+ def __init__(self, items=None):
+ self._set = SetOrdered()
+ self._hashes = dict_()
+ self._hash_to_objects = dict_()
+ if items:
+ for item in items:
+ self.add(item)
+
+ def add(self, item):
+ try:
+ self._set.add(item)
+ except TypeError:
+ hashes_obj = DeepHash(item, hashes=self._hashes)
+ hash_ = hashes_obj[item]
+ if hash_ not in self._hash_to_objects:
+ self._hash_to_objects[hash_] = item
+
+ def __contains__(self, item):
+ try:
+ result = item in self._set
+ except TypeError:
+ hashes_obj = DeepHash(item, hashes=self._hashes)
+ hash_ = hashes_obj[item]
+ result = hash_ in self._hash_to_objects
+ return result
+
+ def pop(self):
+ if self._set:
+ return self._set.pop()
+ else:
+ return self._hash_to_objects.pop(next(iter(self._hash_to_objects)))
+
+ def __eq__(self, other):
+ set_part, hashes_to_objs_part = other
+ return (self._set == set_part and self._hash_to_objects == hashes_to_objs_part)
+
+ __req__ = __eq__
+
+ def __repr__(self):
+ return "< AnySet {}, {} >".format(self._set, self._hash_to_objects)
+
+ __str__ = __repr__
+
+ def __len__(self):
+ return len(self._set) + len(self._hash_to_objects)
+
+ def __iter__(self):
+ for item in self._set:
+ yield item
+ for item in self._hash_to_objects.values():
+ yield item
+
+ def __bool__(self):
+ return bool(self._set or self._hash_to_objects)
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/base.py b/.venv/lib/python3.12/site-packages/deepdiff/base.py
new file mode 100644
index 00000000..d3b24fb8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/base.py
@@ -0,0 +1,51 @@
+from typing import Any
+from deepdiff.helper import strings, numbers, SetOrdered
+
+
+DEFAULT_SIGNIFICANT_DIGITS_WHEN_IGNORE_NUMERIC_TYPES = 12
+TYPE_STABILIZATION_MSG = 'Unable to stabilize the Numpy array {} due to {}. Please set ignore_order=False.'
+
+
+class Base:
+ numbers = numbers
+ strings = strings
+
+ def get_significant_digits(self, significant_digits, ignore_numeric_type_changes):
+ if significant_digits is not None and significant_digits < 0:
+ raise ValueError(
+ "significant_digits must be None or a non-negative integer")
+ if significant_digits is None:
+ if ignore_numeric_type_changes:
+ significant_digits = DEFAULT_SIGNIFICANT_DIGITS_WHEN_IGNORE_NUMERIC_TYPES
+ return significant_digits
+
+ def get_ignore_types_in_groups(self, ignore_type_in_groups,
+ ignore_string_type_changes,
+ ignore_numeric_type_changes,
+ ignore_type_subclasses):
+ if ignore_type_in_groups:
+ if isinstance(ignore_type_in_groups[0], type):
+ ignore_type_in_groups = [ignore_type_in_groups]
+ else:
+ ignore_type_in_groups = []
+
+ result = []
+ for item_group in ignore_type_in_groups:
+ new_item_group = SetOrdered()
+ for item in item_group:
+ item = type(item) if item is None or not isinstance(item, type) else item
+ new_item_group.add(item)
+ result.append(new_item_group)
+ ignore_type_in_groups = result
+
+ if ignore_string_type_changes and self.strings not in ignore_type_in_groups:
+ ignore_type_in_groups.append(SetOrdered(self.strings))
+
+ if ignore_numeric_type_changes and self.numbers not in ignore_type_in_groups:
+ ignore_type_in_groups.append(SetOrdered(self.numbers))
+
+ if not ignore_type_subclasses:
+ # is_instance method needs tuples. When we look for subclasses, we need them to be tuples
+ ignore_type_in_groups = list(map(tuple, ignore_type_in_groups))
+
+ return ignore_type_in_groups
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/commands.py b/.venv/lib/python3.12/site-packages/deepdiff/commands.py
new file mode 100644
index 00000000..1859e35a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/commands.py
@@ -0,0 +1,232 @@
+import click
+import sys
+from decimal import Decimal
+from pprint import pprint
+from deepdiff.diff import (
+ DeepDiff,
+ CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT,
+ CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT,
+ logger
+)
+from deepdiff import Delta, DeepSearch, extract as deep_extract
+from deepdiff.serialization import load_path_content, save_content_to_path
+
+try:
+ import orjson
+except ImportError:
+ orjson = None
+
+
+@click.group()
+def cli():
+ """A simple command line tool."""
+ pass # pragma: no cover.
+
+
+@cli.command()
+@click.argument('t1', type=click.Path(exists=True, resolve_path=True))
+@click.argument('t2', type=click.Path(exists=True, resolve_path=True))
+@click.option('--cutoff-distance-for-pairs', required=False, default=CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT, type=float, show_default=True)
+@click.option('--cutoff-intersection-for-pairs', required=False, default=CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT, type=float, show_default=True)
+@click.option('--cache-size', required=False, default=0, type=int, show_default=True)
+@click.option('--cache-tuning-sample-size', required=False, default=0, type=int, show_default=True)
+@click.option('--cache-purge-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True)
+@click.option('--create-patch', is_flag=True, show_default=True)
+@click.option('--exclude-paths', required=False, type=str, show_default=False, multiple=True)
+@click.option('--exclude-regex-paths', required=False, type=str, show_default=False, multiple=True)
+@click.option('--math-epsilon', required=False, type=Decimal, show_default=False)
+@click.option('--get-deep-distance', is_flag=True, show_default=True)
+@click.option('--group-by', required=False, type=str, show_default=False, multiple=False)
+@click.option('--ignore-order', is_flag=True, show_default=True)
+@click.option('--ignore-string-type-changes', is_flag=True, show_default=True)
+@click.option('--ignore-numeric-type-changes', is_flag=True, show_default=True)
+@click.option('--ignore-type-subclasses', is_flag=True, show_default=True)
+@click.option('--ignore-string-case', is_flag=True, show_default=True)
+@click.option('--ignore-nan-inequality', is_flag=True, show_default=True)
+@click.option('--include-private-variables', is_flag=True, show_default=True)
+@click.option('--log-frequency-in-sec', required=False, default=0, type=int, show_default=True)
+@click.option('--max-passes', required=False, default=10000000, type=int, show_default=True)
+@click.option('--max_diffs', required=False, default=None, type=int, show_default=True)
+@click.option('--threshold-to-diff-deeper', required=False, default=0.33, type=float, show_default=False)
+@click.option('--number-format-notation', required=False, type=click.Choice(['f', 'e'], case_sensitive=True), show_default=True, default="f")
+@click.option('--progress-logger', required=False, type=click.Choice(['info', 'error'], case_sensitive=True), show_default=True, default="info")
+@click.option('--report-repetition', is_flag=True, show_default=True)
+@click.option('--significant-digits', required=False, default=None, type=int, show_default=True)
+@click.option('--truncate-datetime', required=False, type=click.Choice(['second', 'minute', 'hour', 'day'], case_sensitive=True), show_default=True, default=None)
+@click.option('--verbose-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True)
+@click.option('--debug', is_flag=True, show_default=False)
+def diff(
+ *args, **kwargs
+):
+ """
+ Deep Diff Commandline
+
+ Deep Difference of content in files.
+ It can read csv, tsv, json, yaml, and toml files.
+
+ T1 and T2 are the path to the files to be compared with each other.
+ """
+ debug = kwargs.pop('debug')
+ kwargs['ignore_private_variables'] = not kwargs.pop('include_private_variables')
+ kwargs['progress_logger'] = logger.info if kwargs['progress_logger'] == 'info' else logger.error
+ create_patch = kwargs.pop('create_patch')
+ t1_path = kwargs.pop("t1")
+ t2_path = kwargs.pop("t2")
+ t1_extension = t1_path.split('.')[-1]
+ t2_extension = t2_path.split('.')[-1]
+
+ for name, t_path, t_extension in [('t1', t1_path, t1_extension), ('t2', t2_path, t2_extension)]:
+ try:
+ kwargs[name] = load_path_content(t_path, file_type=t_extension)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when loading {name}: {e}")) # pragma: no cover.
+
+ # if (t1_extension != t2_extension):
+ if t1_extension in {'csv', 'tsv'}:
+ kwargs['t1'] = [dict(i) for i in kwargs['t1']]
+ if t2_extension in {'csv', 'tsv'}:
+ kwargs['t2'] = [dict(i) for i in kwargs['t2']]
+
+ if create_patch:
+ # Disabling logging progress since it will leak into stdout
+ kwargs['log_frequency_in_sec'] = 0
+
+ try:
+ diff = DeepDiff(**kwargs)
+ except Exception as e: # pragma: no cover. No need to test this.
+ sys.exit(str(e)) # pragma: no cover. No need to test this.
+
+ if create_patch:
+ try:
+ delta = Delta(diff)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(f"Error when loading the patch (aka delta): {e}") # pragma: no cover.
+
+ # printing into stdout
+ sys.stdout.buffer.write(delta.dumps())
+ else:
+ try:
+ print(diff.to_json(indent=2))
+ except Exception:
+ pprint(diff, indent=2)
+
+
+@cli.command()
+@click.argument('path', type=click.Path(exists=True, resolve_path=True))
+@click.argument('delta_path', type=click.Path(exists=True, resolve_path=True))
+@click.option('--backup', '-b', is_flag=True, show_default=True)
+@click.option('--raise-errors', is_flag=True, show_default=True)
+@click.option('--debug', is_flag=True, show_default=False)
+def patch(
+ path, delta_path, backup, raise_errors, debug
+):
+ """
+ Deep Patch Commandline
+
+ Patches a file based on the information in a delta file.
+ The delta file can be created by the deep diff command and
+ passing the --create-patch argument.
+
+ Deep Patch is similar to Linux's patch command.
+ The difference is that it is made for patching data.
+ It can read csv, tsv, json, yaml, and toml files.
+
+ """
+ try:
+ delta = Delta(delta_path=delta_path, raise_errors=raise_errors)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when loading the patch (aka delta) {delta_path}: {e}")) # pragma: no cover.
+
+ extension = path.split('.')[-1]
+
+ try:
+ content = load_path_content(path, file_type=extension)
+ except Exception as e: # pragma: no cover.
+ sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover.
+
+ result = delta + content
+
+ try:
+ save_content_to_path(result, path, file_type=extension, keep_backup=backup)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when saving {path}: {e}")) # pragma: no cover.
+
+
+@cli.command()
+@click.argument('item', required=True, type=str)
+@click.argument('path', type=click.Path(exists=True, resolve_path=True))
+@click.option('--ignore-case', '-i', is_flag=True, show_default=True)
+@click.option('--exact-match', is_flag=True, show_default=True)
+@click.option('--exclude-paths', required=False, type=str, show_default=False, multiple=True)
+@click.option('--exclude-regex-paths', required=False, type=str, show_default=False, multiple=True)
+@click.option('--verbose-level', required=False, default=1, type=click.IntRange(0, 2), show_default=True)
+@click.option('--debug', is_flag=True, show_default=False)
+def grep(item, path, debug, **kwargs):
+ """
+ Deep Grep Commandline
+
+ Grep through the contents of a file and find the path to the item.
+ It can read csv, tsv, json, yaml, and toml files.
+
+ """
+ kwargs['case_sensitive'] = not kwargs.pop('ignore_case')
+ kwargs['match_string'] = kwargs.pop('exact_match')
+
+ try:
+ content = load_path_content(path)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover.
+
+ try:
+ result = DeepSearch(content, item, **kwargs)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when running deep search on {path}: {e}")) # pragma: no cover.
+ pprint(result, indent=2)
+
+
+@cli.command()
+@click.argument('path_inside', required=True, type=str)
+@click.argument('path', type=click.Path(exists=True, resolve_path=True))
+@click.option('--debug', is_flag=True, show_default=False)
+def extract(path_inside, path, debug):
+ """
+ Deep Extract Commandline
+
+ Extract an item from a file based on the path that is passed.
+ It can read csv, tsv, json, yaml, and toml files.
+
+ """
+ try:
+ content = load_path_content(path)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when loading {path}: {e}")) # pragma: no cover.
+
+ try:
+ result = deep_extract(content, path_inside)
+ except Exception as e: # pragma: no cover.
+ if debug: # pragma: no cover.
+ raise # pragma: no cover.
+ else: # pragma: no cover.
+ sys.exit(str(f"Error when running deep search on {path}: {e}")) # pragma: no cover.
+ pprint(result, indent=2)
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/deephash.py b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py
new file mode 100644
index 00000000..47b900e5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/deephash.py
@@ -0,0 +1,627 @@
+#!/usr/bin/env python
+import logging
+import datetime
+from typing import Union, Optional, Any, List, TYPE_CHECKING
+from collections.abc import Iterable, MutableMapping
+from collections import defaultdict
+from hashlib import sha1, sha256
+from pathlib import Path
+from enum import Enum
+from deepdiff.helper import (strings, numbers, times, unprocessed, not_hashed, add_to_frozen_set,
+ convert_item_or_items_into_set_else_none, get_doc, ipranges,
+ convert_item_or_items_into_compiled_regexes_else_none,
+ get_id, type_is_subclass_of_type_group, type_in_type_group,
+ number_to_string, datetime_normalize, KEY_TO_VAL_STR,
+ get_truncate_datetime, dict_, add_root_to_paths, PydanticBaseModel)
+
+from deepdiff.base import Base
+
+if TYPE_CHECKING:
+ from pytz.tzinfo import BaseTzInfo
+
+
+try:
+ import pandas
+except ImportError:
+ pandas = False
+
+try:
+ import polars
+except ImportError:
+ polars = False
+try:
+ import numpy as np
+ booleanTypes = (bool, np.bool_)
+except ImportError:
+ booleanTypes = bool
+
+logger = logging.getLogger(__name__)
+
+UNPROCESSED_KEY = object()
+
+EMPTY_FROZENSET = frozenset()
+
+INDEX_VS_ATTRIBUTE = ('[%s]', '.%s')
+
+
+HASH_LOOKUP_ERR_MSG = '{} is not one of the hashed items.'
+
+
+def sha256hex(obj):
+ """Use Sha256 as a cryptographic hash."""
+ if isinstance(obj, str):
+ obj = obj.encode('utf-8')
+ return sha256(obj).hexdigest()
+
+
+def sha1hex(obj):
+ """Use Sha1 as a cryptographic hash."""
+ if isinstance(obj, str):
+ obj = obj.encode('utf-8')
+ return sha1(obj).hexdigest()
+
+
+default_hasher = sha256hex
+
+
+def combine_hashes_lists(items, prefix):
+ """
+ Combines lists of hashes into one hash
+ This can be optimized in future.
+ It needs to work with both murmur3 hashes (int) and sha256 (str)
+ Although murmur3 is not used anymore.
+ """
+ if isinstance(prefix, bytes):
+ prefix = prefix.decode('utf-8')
+ hashes_bytes = b''
+ for item in items:
+ # In order to make sure the order of hashes in each item does not affect the hash
+ # we resort them.
+ hashes_bytes += (''.join(map(str, sorted(item))) + '--').encode('utf-8')
+ return prefix + str(default_hasher(hashes_bytes))
+
+
+class BoolObj(Enum):
+ TRUE = 1
+ FALSE = 0
+
+
+def prepare_string_for_hashing(
+ obj,
+ ignore_string_type_changes=False,
+ ignore_string_case=False,
+ encodings=None,
+ ignore_encoding_errors=False,
+):
+ """
+ Clean type conversions
+ """
+ original_type = obj.__class__.__name__
+ # https://docs.python.org/3/library/codecs.html#codecs.decode
+ errors_mode = 'ignore' if ignore_encoding_errors else 'strict'
+ if isinstance(obj, bytes):
+ err = None
+ encodings = ['utf-8'] if encodings is None else encodings
+ encoded = False
+ for encoding in encodings:
+ try:
+ obj = obj.decode(encoding, errors=errors_mode)
+ encoded = True
+ break
+ except UnicodeDecodeError as er:
+ err = er
+ if not encoded and err is not None:
+ obj_decoded = obj.decode('utf-8', errors='ignore') # type: ignore
+ start = max(err.start - 20, 0)
+ start_prefix = ''
+ if start > 0:
+ start_prefix = '...'
+ end = err.end + 20
+ end_suffix = '...'
+ if end >= len(obj):
+ end = len(obj)
+ end_suffix = ''
+ raise UnicodeDecodeError(
+ err.encoding,
+ err.object,
+ err.start,
+ err.end,
+ f"{err.reason} in '{start_prefix}{obj_decoded[start:end]}{end_suffix}'. Please either pass ignore_encoding_errors=True or pass the encoding via encodings=['utf-8', '...']."
+ ) from None
+ if not ignore_string_type_changes:
+ obj = KEY_TO_VAL_STR.format(original_type, obj)
+ if ignore_string_case:
+ obj = obj.lower()
+ return obj
+
+
+doc = get_doc('deephash_doc.rst')
+
+
+class DeepHash(Base):
+ __doc__ = doc
+
+ def __init__(self,
+ obj: Any,
+ *,
+ apply_hash=True,
+ custom_operators: Optional[List[Any]] =None,
+ default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc,
+ encodings=None,
+ exclude_obj_callback=None,
+ exclude_paths=None,
+ exclude_regex_paths=None,
+ exclude_types=None,
+ hasher=None,
+ hashes=None,
+ ignore_encoding_errors=False,
+ ignore_iterable_order=True,
+ ignore_numeric_type_changes=False,
+ ignore_private_variables=True,
+ ignore_repetition=True,
+ ignore_string_case=False,
+ ignore_string_type_changes=False,
+ ignore_type_in_groups=None,
+ ignore_type_subclasses=False,
+ include_paths=None,
+ number_format_notation="f",
+ number_to_string_func=None,
+ parent="root",
+ significant_digits=None,
+ truncate_datetime=None,
+ use_enum_value=False,
+ **kwargs):
+ if kwargs:
+ raise ValueError(
+ ("The following parameter(s) are not valid: %s\n"
+ "The valid parameters are obj, hashes, exclude_types, significant_digits, truncate_datetime,"
+ "exclude_paths, include_paths, exclude_regex_paths, hasher, ignore_repetition, "
+ "number_format_notation, apply_hash, ignore_type_in_groups, ignore_string_type_changes, "
+ "ignore_numeric_type_changes, ignore_type_subclasses, ignore_string_case "
+ "number_to_string_func, ignore_private_variables, parent, use_enum_value, default_timezone "
+ "encodings, ignore_encoding_errors") % ', '.join(kwargs.keys()))
+ if isinstance(hashes, MutableMapping):
+ self.hashes = hashes
+ elif isinstance(hashes, DeepHash):
+ self.hashes = hashes.hashes
+ else:
+ self.hashes = dict_()
+ exclude_types = set() if exclude_types is None else set(exclude_types)
+ self.exclude_types_tuple = tuple(exclude_types) # we need tuple for checking isinstance
+ self.ignore_repetition = ignore_repetition
+ self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths))
+ self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths))
+ self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths)
+ self.hasher = default_hasher if hasher is None else hasher
+ self.hashes[UNPROCESSED_KEY] = []
+ self.use_enum_value = use_enum_value
+ self.default_timezone = default_timezone
+ self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes)
+ self.truncate_datetime = get_truncate_datetime(truncate_datetime)
+ self.number_format_notation = number_format_notation
+ self.ignore_type_in_groups = self.get_ignore_types_in_groups(
+ ignore_type_in_groups=ignore_type_in_groups,
+ ignore_string_type_changes=ignore_string_type_changes,
+ ignore_numeric_type_changes=ignore_numeric_type_changes,
+ ignore_type_subclasses=ignore_type_subclasses)
+ self.ignore_string_type_changes = ignore_string_type_changes
+ self.ignore_numeric_type_changes = ignore_numeric_type_changes
+ self.ignore_string_case = ignore_string_case
+ self.exclude_obj_callback = exclude_obj_callback
+ # makes the hash return constant size result if true
+ # the only time it should be set to False is when
+ # testing the individual hash functions for different types of objects.
+ self.apply_hash = apply_hash
+ self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group
+ # self.type_check_func = type_is_subclass_of_type_group if ignore_type_subclasses else type_in_type_group
+ self.number_to_string = number_to_string_func or number_to_string
+ self.ignore_private_variables = ignore_private_variables
+ self.encodings = encodings
+ self.ignore_encoding_errors = ignore_encoding_errors
+ self.ignore_iterable_order = ignore_iterable_order
+ self.custom_operators = custom_operators
+
+ self._hash(obj, parent=parent, parents_ids=frozenset({get_id(obj)}))
+
+ if self.hashes[UNPROCESSED_KEY]:
+ logger.warning("Can not hash the following items: {}.".format(self.hashes[UNPROCESSED_KEY]))
+ else:
+ del self.hashes[UNPROCESSED_KEY]
+
+ sha256hex = sha256hex
+ sha1hex = sha1hex
+
+ def __getitem__(self, obj, extract_index=0):
+ return self._getitem(self.hashes, obj, extract_index=extract_index, use_enum_value=self.use_enum_value)
+
+ @staticmethod
+ def _getitem(hashes, obj, extract_index=0, use_enum_value=False):
+ """
+ extract_index is zero for hash and 1 for count and None to get them both.
+ To keep it backward compatible, we only get the hash by default so it is set to zero by default.
+ """
+
+ key = obj
+ if obj is True:
+ key = BoolObj.TRUE
+ elif obj is False:
+ key = BoolObj.FALSE
+ elif use_enum_value and isinstance(obj, Enum):
+ key = obj.value
+
+ result_n_count = (None, 0)
+
+ try:
+ result_n_count = hashes[key]
+ except (TypeError, KeyError):
+ key = get_id(obj)
+ try:
+ result_n_count = hashes[key]
+ except KeyError:
+ raise KeyError(HASH_LOOKUP_ERR_MSG.format(obj)) from None
+
+ if obj is UNPROCESSED_KEY:
+ extract_index = None
+
+ return result_n_count if extract_index is None else result_n_count[extract_index]
+
+ def __contains__(self, obj):
+ result = False
+ try:
+ result = obj in self.hashes
+ except (TypeError, KeyError):
+ result = False
+ if not result:
+ result = get_id(obj) in self.hashes
+ return result
+
+ def get(self, key, default=None, extract_index=0):
+ """
+ Get method for the hashes dictionary.
+ It can extract the hash for a given key that is already calculated when extract_index=0
+ or the count of items that went to building the object whenextract_index=1.
+ """
+ return self.get_key(self.hashes, key, default=default, extract_index=extract_index)
+
+ @staticmethod
+ def get_key(hashes, key, default=None, extract_index=0, use_enum_value=False):
+ """
+ get_key method for the hashes dictionary.
+ It can extract the hash for a given key that is already calculated when extract_index=0
+ or the count of items that went to building the object whenextract_index=1.
+ """
+ try:
+ result = DeepHash._getitem(hashes, key, extract_index=extract_index, use_enum_value=use_enum_value)
+ except KeyError:
+ result = default
+ return result
+
+ def _get_objects_to_hashes_dict(self, extract_index=0):
+ """
+ A dictionary containing only the objects to hashes,
+ or a dictionary of objects to the count of items that went to build them.
+ extract_index=0 for hashes and extract_index=1 for counts.
+ """
+ result = dict_()
+ for key, value in self.hashes.items():
+ if key is UNPROCESSED_KEY:
+ result[key] = value
+ else:
+ result[key] = value[extract_index]
+ return result
+
+ def __eq__(self, other):
+ if isinstance(other, DeepHash):
+ return self.hashes == other.hashes
+ else:
+ # We only care about the hashes
+ return self._get_objects_to_hashes_dict() == other
+
+ __req__ = __eq__
+
+ def __repr__(self):
+ """
+ Hide the counts since it will be confusing to see them when they are hidden everywhere else.
+ """
+ from deepdiff.summarize import summarize
+ return summarize(self._get_objects_to_hashes_dict(extract_index=0), max_length=500)
+
+ def __str__(self):
+ return str(self._get_objects_to_hashes_dict(extract_index=0))
+
+ def __bool__(self):
+ return bool(self.hashes)
+
+ def keys(self):
+ return self.hashes.keys()
+
+ def values(self):
+ return (i[0] for i in self.hashes.values()) # Just grab the item and not its count
+
+ def items(self):
+ return ((i, v[0]) for i, v in self.hashes.items())
+
+ def _prep_obj(self, obj, parent, parents_ids=EMPTY_FROZENSET, is_namedtuple=False, is_pydantic_object=False):
+ """prepping objects"""
+ original_type = type(obj) if not isinstance(obj, type) else obj
+
+ obj_to_dict_strategies = []
+ if is_namedtuple:
+ obj_to_dict_strategies.append(lambda o: o._asdict())
+ elif is_pydantic_object:
+ obj_to_dict_strategies.append(lambda o: {k: v for (k, v) in o.__dict__.items() if v !="model_fields_set"})
+ else:
+ obj_to_dict_strategies.append(lambda o: o.__dict__)
+
+ if hasattr(obj, "__slots__"):
+ obj_to_dict_strategies.append(lambda o: {i: getattr(o, i) for i in o.__slots__})
+ else:
+ import inspect
+ obj_to_dict_strategies.append(lambda o: dict(inspect.getmembers(o, lambda m: not inspect.isroutine(m))))
+
+ for get_dict in obj_to_dict_strategies:
+ try:
+ d = get_dict(obj)
+ break
+ except AttributeError:
+ pass
+ else:
+ self.hashes[UNPROCESSED_KEY].append(obj)
+ return (unprocessed, 0)
+ obj = d
+
+ result, counts = self._prep_dict(obj, parent=parent, parents_ids=parents_ids,
+ print_as_attribute=True, original_type=original_type)
+ result = "nt{}".format(result) if is_namedtuple else "obj{}".format(result)
+ return result, counts
+
+ def _skip_this(self, obj, parent):
+ skip = False
+ if self.exclude_paths and parent in self.exclude_paths:
+ skip = True
+ if self.include_paths and parent != 'root':
+ if parent not in self.include_paths:
+ skip = True
+ for prefix in self.include_paths:
+ if parent.startswith(prefix):
+ skip = False
+ break
+ elif self.exclude_regex_paths and any(
+ [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]): # type: ignore
+ skip = True
+ elif self.exclude_types_tuple and isinstance(obj, self.exclude_types_tuple):
+ skip = True
+ elif self.exclude_obj_callback and self.exclude_obj_callback(obj, parent):
+ skip = True
+ return skip
+
+ def _prep_dict(self, obj, parent, parents_ids=EMPTY_FROZENSET, print_as_attribute=False, original_type=None):
+
+ result = []
+ counts = 1
+
+ key_text = "%s{}".format(INDEX_VS_ATTRIBUTE[print_as_attribute])
+ for key, item in obj.items():
+ counts += 1
+ # ignore private variables
+ if self.ignore_private_variables and isinstance(key, str) and key.startswith('__'):
+ continue
+ key_formatted = "'%s'" % key if not print_as_attribute and isinstance(key, strings) else key
+ key_in_report = key_text % (parent, key_formatted)
+
+ key_hash, _ = self._hash(key, parent=key_in_report, parents_ids=parents_ids)
+ if not key_hash:
+ continue
+ item_id = get_id(item)
+ if (parents_ids and item_id in parents_ids) or self._skip_this(item, parent=key_in_report):
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ hashed, count = self._hash(item, parent=key_in_report, parents_ids=parents_ids_added)
+ hashed = KEY_TO_VAL_STR.format(key_hash, hashed)
+ result.append(hashed)
+ counts += count
+
+ result.sort()
+ result = ';'.join(result)
+ if print_as_attribute:
+ type_ = original_type or type(obj)
+ type_str = type_.__name__
+ for type_group in self.ignore_type_in_groups:
+ if self.type_check_func(type_, type_group):
+ type_str = ','.join(map(lambda x: x.__name__, type_group))
+ break
+ else:
+ type_str = 'dict'
+ return "{}:{{{}}}".format(type_str, result), counts
+
+ def _prep_iterable(self, obj, parent, parents_ids=EMPTY_FROZENSET):
+
+ counts = 1
+ result = defaultdict(int)
+
+ for i, item in enumerate(obj):
+ new_parent = "{}[{}]".format(parent, i)
+ if self._skip_this(item, parent=new_parent):
+ continue
+
+ item_id = get_id(item)
+ if parents_ids and item_id in parents_ids:
+ continue
+
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ hashed, count = self._hash(item, parent=new_parent, parents_ids=parents_ids_added)
+ # counting repetitions
+ result[hashed] += 1
+ counts += count
+
+ if self.ignore_repetition:
+ result = list(result.keys())
+ else:
+ result = [
+ '{}|{}'.format(i, v) for i, v in result.items()
+ ]
+
+ result = map(str, result) # making sure the result items are string so join command works.
+ if self.ignore_iterable_order:
+ result = sorted(result)
+ result = ','.join(result)
+ result = KEY_TO_VAL_STR.format(type(obj).__name__, result)
+
+ return result, counts
+
+ def _prep_bool(self, obj):
+ return BoolObj.TRUE if obj else BoolObj.FALSE
+
+
+ def _prep_path(self, obj):
+ type_ = obj.__class__.__name__
+ return KEY_TO_VAL_STR.format(type_, obj)
+
+ def _prep_number(self, obj):
+ type_ = "number" if self.ignore_numeric_type_changes else obj.__class__.__name__
+ if self.significant_digits is not None:
+ obj = self.number_to_string(obj, significant_digits=self.significant_digits,
+ number_format_notation=self.number_format_notation)
+ return KEY_TO_VAL_STR.format(type_, obj)
+
+ def _prep_ipranges(self, obj):
+ type_ = 'iprange'
+ obj = str(obj)
+ return KEY_TO_VAL_STR.format(type_, obj)
+
+ def _prep_datetime(self, obj):
+ type_ = 'datetime'
+ obj = datetime_normalize(self.truncate_datetime, obj, default_timezone=self.default_timezone)
+ return KEY_TO_VAL_STR.format(type_, obj)
+
+ def _prep_date(self, obj):
+ type_ = 'datetime' # yes still datetime but it doesn't need normalization
+ return KEY_TO_VAL_STR.format(type_, obj)
+
+ def _prep_tuple(self, obj, parent, parents_ids):
+ # Checking to see if it has _fields. Which probably means it is a named
+ # tuple.
+ try:
+ obj._asdict
+ # It must be a normal tuple
+ except AttributeError:
+ result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids)
+ # We assume it is a namedtuple then
+ else:
+ result, counts = self._prep_obj(obj, parent, parents_ids=parents_ids, is_namedtuple=True)
+ return result, counts
+
+ def _hash(self, obj, parent, parents_ids=EMPTY_FROZENSET):
+ """The main hash method"""
+ counts = 1
+ if self.custom_operators is not None:
+ for operator in self.custom_operators:
+ func = getattr(operator, 'normalize_value_for_hashing', None)
+ if func is None:
+ raise NotImplementedError(f"{operator.__class__.__name__} needs to define a normalize_value_for_hashing method to be compatible with ignore_order=True or iterable_compare_func.".format(operator))
+ else:
+ obj = func(parent, obj)
+
+ if isinstance(obj, booleanTypes):
+ obj = self._prep_bool(obj)
+ result = None
+ elif self.use_enum_value and isinstance(obj, Enum):
+ obj = obj.value
+ else:
+ result = not_hashed
+ try:
+ result, counts = self.hashes[obj]
+ except (TypeError, KeyError):
+ pass
+ else:
+ return result, counts
+
+ if self._skip_this(obj, parent):
+ return None, 0
+
+ elif obj is None:
+ result = 'NONE'
+
+ elif isinstance(obj, strings):
+ result = prepare_string_for_hashing(
+ obj,
+ ignore_string_type_changes=self.ignore_string_type_changes,
+ ignore_string_case=self.ignore_string_case,
+ encodings=self.encodings,
+ ignore_encoding_errors=self.ignore_encoding_errors,
+ )
+
+ elif isinstance(obj, Path):
+ result = self._prep_path(obj)
+
+ elif isinstance(obj, times):
+ result = self._prep_datetime(obj)
+
+ elif isinstance(obj, datetime.date):
+ result = self._prep_date(obj)
+
+ elif isinstance(obj, numbers): # type: ignore
+ result = self._prep_number(obj)
+
+ elif isinstance(obj, ipranges):
+ result = self._prep_ipranges(obj)
+
+ elif isinstance(obj, MutableMapping):
+ result, counts = self._prep_dict(obj=obj, parent=parent, parents_ids=parents_ids)
+
+ elif isinstance(obj, tuple):
+ result, counts = self._prep_tuple(obj=obj, parent=parent, parents_ids=parents_ids)
+
+ elif (pandas and isinstance(obj, pandas.DataFrame)): # type: ignore
+ def gen(): # type: ignore
+ yield ('dtype', obj.dtypes) # type: ignore
+ yield ('index', obj.index) # type: ignore
+ yield from obj.items() # type: ignore # which contains (column name, series tuples)
+ result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids)
+ elif (polars and isinstance(obj, polars.DataFrame)): # type: ignore
+ def gen():
+ yield from obj.columns # type: ignore
+ yield from list(obj.schema.items()) # type: ignore
+ yield from obj.rows() # type: ignore
+ result, counts = self._prep_iterable(obj=gen(), parent=parent, parents_ids=parents_ids)
+
+ elif isinstance(obj, Iterable):
+ result, counts = self._prep_iterable(obj=obj, parent=parent, parents_ids=parents_ids)
+
+ elif obj == BoolObj.TRUE or obj == BoolObj.FALSE:
+ result = 'bool:true' if obj is BoolObj.TRUE else 'bool:false'
+ elif isinstance(obj, PydanticBaseModel):
+ result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids, is_pydantic_object=True)
+ else:
+ result, counts = self._prep_obj(obj=obj, parent=parent, parents_ids=parents_ids)
+
+ if result is not_hashed: # pragma: no cover
+ self.hashes[UNPROCESSED_KEY].append(obj)
+
+ elif result is unprocessed:
+ pass
+
+ elif self.apply_hash:
+ if isinstance(obj, strings):
+ result_cleaned = result
+ else:
+ result_cleaned = prepare_string_for_hashing(
+ result, ignore_string_type_changes=self.ignore_string_type_changes,
+ ignore_string_case=self.ignore_string_case)
+ result = self.hasher(result_cleaned)
+
+ # It is important to keep the hash of all objects.
+ # The hashes will be later used for comparing the objects.
+ # Object to hash when possible otherwise ObjectID to hash
+ try:
+ self.hashes[obj] = (result, counts)
+ except TypeError:
+ obj_id = get_id(obj)
+ self.hashes[obj_id] = (result, counts)
+
+ return result, counts
+
+
+if __name__ == "__main__": # pragma: no cover
+ import doctest
+ doctest.testmod()
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/delta.py b/.venv/lib/python3.12/site-packages/deepdiff/delta.py
new file mode 100644
index 00000000..a76593cd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/delta.py
@@ -0,0 +1,1217 @@
+import copy
+import logging
+from typing import List, Dict, IO, Callable, Set, Union, Optional
+from functools import partial, cmp_to_key
+from collections.abc import Mapping
+from copy import deepcopy
+from deepdiff import DeepDiff
+from deepdiff.serialization import pickle_load, pickle_dump
+from deepdiff.helper import (
+ strings, numbers,
+ np_ndarray, np_array_factory, numpy_dtypes, get_doc,
+ not_found, numpy_dtype_string_to_type, dict_,
+ Opcode, FlatDeltaRow, UnkownValueCode, FlatDataAction,
+ OPCODE_TAG_TO_FLAT_DATA_ACTION,
+ FLAT_DATA_ACTION_TO_OPCODE_TAG,
+ SetOrdered,
+)
+from deepdiff.path import (
+ _path_to_elements, _get_nested_obj, _get_nested_obj_and_force,
+ GET, GETATTR, parse_path, stringify_path,
+)
+from deepdiff.anyset import AnySet
+from deepdiff.summarize import summarize
+
+logger = logging.getLogger(__name__)
+
+
+VERIFICATION_MSG = 'Expected the old value for {} to be {} but it is {}. Error found on: {}. You may want to set force=True, especially if this delta is created by passing flat_rows_list or flat_dict_list'
+ELEM_NOT_FOUND_TO_ADD_MSG = 'Key or index of {} is not found for {} for setting operation.'
+TYPE_CHANGE_FAIL_MSG = 'Unable to do the type change for {} from to type {} due to {}'
+VERIFY_BIDIRECTIONAL_MSG = ('You have applied the delta to an object that has '
+ 'different values than the original object the delta was made from.')
+FAIL_TO_REMOVE_ITEM_IGNORE_ORDER_MSG = 'Failed to remove index[{}] on {}. It was expected to be {} but got {}'
+DELTA_NUMPY_OPERATOR_OVERRIDE_MSG = (
+ 'A numpy ndarray is most likely being added to a delta. '
+ 'Due to Numpy override the + operator, you can only do: delta + ndarray '
+ 'and NOT ndarray + delta')
+BINIARY_MODE_NEEDED_MSG = "Please open the file in the binary mode and pass to Delta by passing 'b' in open(..., 'b'): {}"
+DELTA_AT_LEAST_ONE_ARG_NEEDED = 'At least one of the diff, delta_path or delta_file arguments need to be passed.'
+INVALID_ACTION_WHEN_CALLING_GET_ELEM = 'invalid action of {} when calling _get_elem_and_compare_to_old_value'
+INVALID_ACTION_WHEN_CALLING_SIMPLE_SET_ELEM = 'invalid action of {} when calling _simple_set_elem_value'
+INVALID_ACTION_WHEN_CALLING_SIMPLE_DELETE_ELEM = 'invalid action of {} when calling _simple_set_elem_value'
+UNABLE_TO_GET_ITEM_MSG = 'Unable to get the item at {}: {}'
+UNABLE_TO_GET_PATH_MSG = 'Unable to get the item at {}'
+INDEXES_NOT_FOUND_WHEN_IGNORE_ORDER = 'Delta added to an incompatible object. Unable to add the following items at the specific indexes. {}'
+NUMPY_TO_LIST = 'NUMPY_TO_LIST'
+NOT_VALID_NUMPY_TYPE = "{} is not a valid numpy type."
+
+doc = get_doc('delta.rst')
+
+
+class DeltaError(ValueError):
+ """
+ Delta specific errors
+ """
+ pass
+
+
+class DeltaNumpyOperatorOverrideError(ValueError):
+ """
+ Delta Numpy Operator Override Error
+ """
+ pass
+
+
+class Delta:
+
+ __doc__ = doc
+
+ def __init__(
+ self,
+ diff: Union[DeepDiff, Mapping, str, bytes, None]=None,
+ delta_path: Optional[str]=None,
+ delta_file: Optional[IO]=None,
+ delta_diff: Optional[dict]=None,
+ flat_dict_list: Optional[List[Dict]]=None,
+ flat_rows_list: Optional[List[FlatDeltaRow]]=None,
+ deserializer: Callable=pickle_load,
+ log_errors: bool=True,
+ mutate: bool=False,
+ raise_errors: bool=False,
+ safe_to_import: Optional[Set[str]]=None,
+ serializer: Callable=pickle_dump,
+ verify_symmetry: Optional[bool]=None,
+ bidirectional: bool=False,
+ always_include_values: bool=False,
+ iterable_compare_func_was_used: Optional[bool]=None,
+ force: bool=False,
+ ):
+ # for pickle deserializer:
+ if hasattr(deserializer, '__code__') and 'safe_to_import' in set(deserializer.__code__.co_varnames):
+ _deserializer = deserializer
+ else:
+ def _deserializer(obj, safe_to_import=None):
+ result = deserializer(obj)
+ if result.get('_iterable_opcodes'):
+ _iterable_opcodes = {}
+ for path, op_codes in result['_iterable_opcodes'].items():
+ _iterable_opcodes[path] = []
+ for op_code in op_codes:
+ _iterable_opcodes[path].append(
+ Opcode(
+ **op_code
+ )
+ )
+ result['_iterable_opcodes'] = _iterable_opcodes
+ return result
+
+
+ self._reversed_diff = None
+
+ if verify_symmetry is not None:
+ logger.warning(
+ "DeepDiff Deprecation: use bidirectional instead of verify_symmetry parameter."
+ )
+ bidirectional = verify_symmetry
+
+ self.bidirectional = bidirectional
+ if bidirectional:
+ self.always_include_values = True # We need to include the values in bidirectional deltas
+ else:
+ self.always_include_values = always_include_values
+
+ if diff is not None:
+ if isinstance(diff, DeepDiff):
+ self.diff = diff._to_delta_dict(directed=not bidirectional, always_include_values=self.always_include_values)
+ elif isinstance(diff, Mapping):
+ self.diff = diff
+ elif isinstance(diff, strings):
+ self.diff = _deserializer(diff, safe_to_import=safe_to_import)
+ elif delta_path:
+ with open(delta_path, 'rb') as the_file:
+ content = the_file.read()
+ self.diff = _deserializer(content, safe_to_import=safe_to_import)
+ elif delta_diff:
+ self.diff = delta_diff
+ elif delta_file:
+ try:
+ content = delta_file.read()
+ except UnicodeDecodeError as e:
+ raise ValueError(BINIARY_MODE_NEEDED_MSG.format(e)) from None
+ self.diff = _deserializer(content, safe_to_import=safe_to_import)
+ elif flat_dict_list:
+ # Use copy to preserve original value of flat_dict_list in calling module
+ self.diff = self._from_flat_dicts(copy.deepcopy(flat_dict_list))
+ elif flat_rows_list:
+ self.diff = self._from_flat_rows(copy.deepcopy(flat_rows_list))
+ else:
+ raise ValueError(DELTA_AT_LEAST_ONE_ARG_NEEDED)
+
+ self.mutate = mutate
+ self.raise_errors = raise_errors
+ self.log_errors = log_errors
+ self._numpy_paths = self.diff.get('_numpy_paths', False)
+ # When we create the delta from a list of flat dictionaries, details such as iterable_compare_func_was_used get lost.
+ # That's why we allow iterable_compare_func_was_used to be explicitly set.
+ self._iterable_compare_func_was_used = self.diff.get('_iterable_compare_func_was_used', iterable_compare_func_was_used)
+ self.serializer = serializer
+ self.deserializer = deserializer
+ self.force = force
+ if force:
+ self.get_nested_obj = _get_nested_obj_and_force
+ else:
+ self.get_nested_obj = _get_nested_obj
+ self.reset()
+
+ def __repr__(self):
+ return "<Delta: {}>".format(summarize(self.diff, max_length=100))
+
+ def reset(self):
+ self.post_process_paths_to_convert = dict_()
+
+ def __add__(self, other):
+ if isinstance(other, numbers) and self._numpy_paths: # type: ignore
+ raise DeltaNumpyOperatorOverrideError(DELTA_NUMPY_OPERATOR_OVERRIDE_MSG)
+ if self.mutate:
+ self.root = other
+ else:
+ self.root = deepcopy(other)
+ self._do_pre_process()
+ self._do_values_changed()
+ self._do_set_item_added()
+ self._do_set_item_removed()
+ self._do_type_changes()
+ # NOTE: the remove iterable action needs to happen BEFORE
+ # all the other iterables to match the reverse of order of operations in DeepDiff
+ self._do_iterable_opcodes()
+ self._do_iterable_item_removed()
+ self._do_iterable_item_added()
+ self._do_ignore_order()
+ self._do_dictionary_item_added()
+ self._do_dictionary_item_removed()
+ self._do_attribute_added()
+ self._do_attribute_removed()
+ self._do_post_process()
+
+ other = self.root
+ # removing the reference to other
+ del self.root
+ self.reset()
+ return other
+
+ __radd__ = __add__
+
+ def __rsub__(self, other):
+ if self._reversed_diff is None:
+ self._reversed_diff = self._get_reverse_diff()
+ self.diff, self._reversed_diff = self._reversed_diff, self.diff
+ result = self.__add__(other)
+ self.diff, self._reversed_diff = self._reversed_diff, self.diff
+ return result
+
+ def _raise_or_log(self, msg, level='error'):
+ if self.log_errors:
+ getattr(logger, level)(msg)
+ if self.raise_errors:
+ raise DeltaError(msg)
+
+ def _do_verify_changes(self, path, expected_old_value, current_old_value):
+ if self.bidirectional and expected_old_value != current_old_value:
+ if isinstance(path, str):
+ path_str = path
+ else:
+ path_str = stringify_path(path, root_element=('', GETATTR))
+ self._raise_or_log(VERIFICATION_MSG.format(
+ path_str, expected_old_value, current_old_value, VERIFY_BIDIRECTIONAL_MSG))
+
+ def _get_elem_and_compare_to_old_value(
+ self,
+ obj,
+ path_for_err_reporting,
+ expected_old_value,
+ elem=None,
+ action=None,
+ forced_old_value=None,
+ next_element=None,
+ ):
+ # if forced_old_value is not None:
+ try:
+ if action == GET:
+ current_old_value = obj[elem]
+ elif action == GETATTR:
+ current_old_value = getattr(obj, elem) # type: ignore
+ else:
+ raise DeltaError(INVALID_ACTION_WHEN_CALLING_GET_ELEM.format(action))
+ except (KeyError, IndexError, AttributeError, TypeError) as e:
+ if self.force:
+ if forced_old_value is None:
+ if next_element is None or isinstance(next_element, str):
+ _forced_old_value = {}
+ else:
+ _forced_old_value = []
+ else:
+ _forced_old_value = forced_old_value
+ if action == GET:
+ if isinstance(obj, list):
+ if isinstance(elem, int) and elem < len(obj):
+ obj[elem] = _forced_old_value
+ else:
+ obj.append(_forced_old_value)
+ else:
+ obj[elem] = _forced_old_value
+ elif action == GETATTR:
+ setattr(obj, elem, _forced_old_value) # type: ignore
+ return _forced_old_value
+ current_old_value = not_found
+ if isinstance(path_for_err_reporting, (list, tuple)):
+ path_for_err_reporting = '.'.join([i[0] for i in path_for_err_reporting])
+ if self.bidirectional:
+ self._raise_or_log(VERIFICATION_MSG.format(
+ path_for_err_reporting,
+ expected_old_value, current_old_value, e))
+ else:
+ self._raise_or_log(UNABLE_TO_GET_PATH_MSG.format(
+ path_for_err_reporting))
+ return current_old_value
+
+ def _simple_set_elem_value(self, obj, path_for_err_reporting, elem=None, value=None, action=None):
+ """
+ Set the element value directly on an object
+ """
+ try:
+ if action == GET:
+ try:
+ obj[elem] = value
+ except IndexError:
+ if elem == len(obj):
+ obj.append(value)
+ else:
+ self._raise_or_log(ELEM_NOT_FOUND_TO_ADD_MSG.format(elem, path_for_err_reporting))
+ elif action == GETATTR:
+ setattr(obj, elem, value) # type: ignore
+ else:
+ raise DeltaError(INVALID_ACTION_WHEN_CALLING_SIMPLE_SET_ELEM.format(action))
+ except (KeyError, IndexError, AttributeError, TypeError) as e:
+ self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e))
+
+ def _coerce_obj(self, parent, obj, path, parent_to_obj_elem,
+ parent_to_obj_action, elements, to_type, from_type):
+ """
+ Coerce obj and mark it in post_process_paths_to_convert for later to be converted back.
+ Also reassign it to its parent to replace the old object.
+ """
+ self.post_process_paths_to_convert[elements[:-1]] = {'old_type': to_type, 'new_type': from_type}
+ # If this function is going to ever be used to convert numpy arrays, uncomment these lines:
+ # if from_type is np_ndarray:
+ # obj = obj.tolist()
+ # else:
+ obj = to_type(obj)
+
+ if parent:
+ # Making sure that the object is re-instated inside the parent especially if it was immutable
+ # and we had to turn it into a mutable one. In such cases the object has a new id.
+ self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem,
+ value=obj, action=parent_to_obj_action)
+ return obj
+
+ def _set_new_value(self, parent, parent_to_obj_elem, parent_to_obj_action,
+ obj, elements, path, elem, action, new_value):
+ """
+ Set the element value on an object and if necessary convert the object to the proper mutable type
+ """
+ if isinstance(obj, tuple):
+ # convert this object back to a tuple later
+ obj = self._coerce_obj(
+ parent, obj, path, parent_to_obj_elem,
+ parent_to_obj_action, elements,
+ to_type=list, from_type=tuple)
+ if elem != 0 and self.force and isinstance(obj, list) and len(obj) == 0:
+ # it must have been a dictionary
+ obj = {}
+ self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem,
+ value=obj, action=parent_to_obj_action)
+ self._simple_set_elem_value(obj=obj, path_for_err_reporting=path, elem=elem,
+ value=new_value, action=action)
+
+ def _simple_delete_elem(self, obj, path_for_err_reporting, elem=None, action=None):
+ """
+ Delete the element directly on an object
+ """
+ try:
+ if action == GET:
+ del obj[elem]
+ elif action == GETATTR:
+ del obj.__dict__[elem]
+ else:
+ raise DeltaError(INVALID_ACTION_WHEN_CALLING_SIMPLE_DELETE_ELEM.format(action))
+ except (KeyError, IndexError, AttributeError) as e:
+ self._raise_or_log('Failed to set {} due to {}'.format(path_for_err_reporting, e))
+
+ def _del_elem(self, parent, parent_to_obj_elem, parent_to_obj_action,
+ obj, elements, path, elem, action):
+ """
+ Delete the element value on an object and if necessary convert the object to the proper mutable type
+ """
+ obj_is_new = False
+ if isinstance(obj, tuple):
+ # convert this object back to a tuple later
+ self.post_process_paths_to_convert[elements[:-1]] = {'old_type': list, 'new_type': tuple}
+ obj = list(obj)
+ obj_is_new = True
+ self._simple_delete_elem(obj=obj, path_for_err_reporting=path, elem=elem, action=action)
+ if obj_is_new and parent:
+ # Making sure that the object is re-instated inside the parent especially if it was immutable
+ # and we had to turn it into a mutable one. In such cases the object has a new id.
+ self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem,
+ value=obj, action=parent_to_obj_action)
+
+ def _do_iterable_item_added(self):
+ iterable_item_added = self.diff.get('iterable_item_added', {})
+ iterable_item_moved = self.diff.get('iterable_item_moved')
+
+ # First we need to create a placeholder for moved items.
+ # This will then get replaced below after we go through added items.
+ # Without this items can get double added because moved store the new_value and does not need item_added replayed
+ if iterable_item_moved:
+ added_dict = {v["new_path"]: None for k, v in iterable_item_moved.items()}
+ iterable_item_added.update(added_dict)
+
+ if iterable_item_added:
+ self._do_item_added(iterable_item_added, insert=True)
+
+ if iterable_item_moved:
+ added_dict = {v["new_path"]: v["value"] for k, v in iterable_item_moved.items()}
+ self._do_item_added(added_dict, insert=False)
+
+ def _do_dictionary_item_added(self):
+ dictionary_item_added = self.diff.get('dictionary_item_added')
+ if dictionary_item_added:
+ self._do_item_added(dictionary_item_added, sort=False)
+
+ def _do_attribute_added(self):
+ attribute_added = self.diff.get('attribute_added')
+ if attribute_added:
+ self._do_item_added(attribute_added)
+
+ @staticmethod
+ def _sort_key_for_item_added(path_and_value):
+ elements = _path_to_elements(path_and_value[0])
+ # Example elements: [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')]
+ # We only care about the values in the elements not how to get the values.
+ return [i[0] for i in elements]
+
+ @staticmethod
+ def _sort_comparison(left, right):
+ """
+ We use sort comparison instead of _sort_key_for_item_added when we run into comparing element types that can not
+ be compared with each other, such as None to None. Or integer to string.
+ """
+ # Example elements: [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')]
+ # We only care about the values in the elements not how to get the values.
+ left_path = [i[0] for i in _path_to_elements(left[0], root_element=None)]
+ right_path = [i[0] for i in _path_to_elements(right[0], root_element=None)]
+ try:
+ if left_path < right_path:
+ return -1
+ elif left_path > right_path:
+ return 1
+ else:
+ return 0
+ except TypeError:
+ if len(left_path) > len(right_path):
+ left_path = left_path[:len(right_path)]
+ elif len(right_path) > len(left_path):
+ right_path = right_path[:len(left_path)]
+ for l_elem, r_elem in zip(left_path, right_path):
+ if type(l_elem) != type(r_elem) or type(l_elem) in None:
+ l_elem = str(l_elem)
+ r_elem = str(r_elem)
+ try:
+ if l_elem < r_elem:
+ return -1
+ elif l_elem > r_elem:
+ return 1
+ except TypeError:
+ continue
+ return 0
+
+
+ def _do_item_added(self, items, sort=True, insert=False):
+ if sort:
+ # sorting items by their path so that the items with smaller index
+ # are applied first (unless `sort` is `False` so that order of
+ # added items is retained, e.g. for dicts).
+ try:
+ items = sorted(items.items(), key=self._sort_key_for_item_added)
+ except TypeError:
+ items = sorted(items.items(), key=cmp_to_key(self._sort_comparison))
+ else:
+ items = items.items()
+
+ for path, new_value in items:
+ elem_and_details = self._get_elements_and_details(path)
+ if elem_and_details:
+ elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details
+ else:
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+
+ # Insert is only true for iterables, make sure it is a valid index.
+ if(insert and elem < len(obj)): # type: ignore
+ obj.insert(elem, None) # type: ignore
+
+ self._set_new_value(parent, parent_to_obj_elem, parent_to_obj_action,
+ obj, elements, path, elem, action, new_value)
+
+ def _do_values_changed(self):
+ values_changed = self.diff.get('values_changed')
+ if values_changed:
+ self._do_values_or_type_changed(values_changed)
+
+ def _do_type_changes(self):
+ type_changes = self.diff.get('type_changes')
+ if type_changes:
+ self._do_values_or_type_changed(type_changes, is_type_change=True)
+
+ def _do_post_process(self):
+ if self.post_process_paths_to_convert:
+ # Example: We had converted some object to be mutable and now we are converting them back to be immutable.
+ # We don't need to check the change because it is not really a change that was part of the original diff.
+ self._do_values_or_type_changed(self.post_process_paths_to_convert, is_type_change=True, verify_changes=False)
+
+ def _do_pre_process(self):
+ if self._numpy_paths and ('iterable_item_added' in self.diff or 'iterable_item_removed' in self.diff):
+ preprocess_paths = dict_()
+ for path, type_ in self._numpy_paths.items(): # type: ignore
+ preprocess_paths[path] = {'old_type': np_ndarray, 'new_type': list}
+ try:
+ type_ = numpy_dtype_string_to_type(type_)
+ except Exception as e:
+ self._raise_or_log(NOT_VALID_NUMPY_TYPE.format(e))
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+ self.post_process_paths_to_convert[path] = {'old_type': list, 'new_type': type_}
+ if preprocess_paths:
+ self._do_values_or_type_changed(preprocess_paths, is_type_change=True)
+
+ def _get_elements_and_details(self, path):
+ try:
+ elements = _path_to_elements(path)
+ if len(elements) > 1:
+ elements_subset = elements[:-2]
+ if len(elements_subset) != len(elements):
+ next_element = elements[-2][0]
+ next2_element = elements[-1][0]
+ else:
+ next_element = None
+ parent = self.get_nested_obj(obj=self, elements=elements_subset, next_element=next_element)
+ parent_to_obj_elem, parent_to_obj_action = elements[-2]
+ obj = self._get_elem_and_compare_to_old_value(
+ obj=parent, path_for_err_reporting=path, expected_old_value=None,
+ elem=parent_to_obj_elem, action=parent_to_obj_action, next_element=next2_element) # type: ignore
+ else:
+ # parent = self
+ # obj = self.root
+ # parent_to_obj_elem = 'root'
+ # parent_to_obj_action = GETATTR
+ parent = parent_to_obj_elem = parent_to_obj_action = None
+ obj = self
+ # obj = self.get_nested_obj(obj=self, elements=elements[:-1])
+ elem, action = elements[-1] # type: ignore
+ except Exception as e:
+ self._raise_or_log(UNABLE_TO_GET_ITEM_MSG.format(path, e))
+ return None
+ else:
+ if obj is not_found:
+ return None
+ return elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action
+
+ def _do_values_or_type_changed(self, changes, is_type_change=False, verify_changes=True):
+ for path, value in changes.items():
+ elem_and_details = self._get_elements_and_details(path)
+ if elem_and_details:
+ elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details
+ else:
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+ expected_old_value = value.get('old_value', not_found)
+
+ current_old_value = self._get_elem_and_compare_to_old_value(
+ obj=obj, path_for_err_reporting=path, expected_old_value=expected_old_value, elem=elem, action=action)
+ if current_old_value is not_found:
+ continue # pragma: no cover. I have not been able to write a test for this case. But we should still check for it.
+ # With type change if we could have originally converted the type from old_value
+ # to new_value just by applying the class of the new_value, then we might not include the new_value
+ # in the delta dictionary. That is defined in Model.DeltaResult._from_tree_type_changes
+ if is_type_change and 'new_value' not in value:
+ try:
+ new_type = value['new_type']
+ # in case of Numpy we pass the ndarray plus the dtype in a tuple
+ if new_type in numpy_dtypes:
+ new_value = np_array_factory(current_old_value, new_type)
+ else:
+ new_value = new_type(current_old_value)
+ except Exception as e:
+ self._raise_or_log(TYPE_CHANGE_FAIL_MSG.format(obj[elem], value.get('new_type', 'unknown'), e)) # type: ignore
+ continue
+ else:
+ new_value = value['new_value']
+
+ self._set_new_value(parent, parent_to_obj_elem, parent_to_obj_action,
+ obj, elements, path, elem, action, new_value)
+
+ if verify_changes:
+ self._do_verify_changes(path, expected_old_value, current_old_value)
+
+ def _do_item_removed(self, items):
+ """
+ Handle removing items.
+ """
+ # Sorting the iterable_item_removed in reverse order based on the paths.
+ # So that we delete a bigger index before a smaller index
+ try:
+ sorted_item = sorted(items.items(), key=self._sort_key_for_item_added, reverse=True)
+ except TypeError:
+ sorted_item = sorted(items.items(), key=cmp_to_key(self._sort_comparison), reverse=True)
+ for path, expected_old_value in sorted_item:
+ elem_and_details = self._get_elements_and_details(path)
+ if elem_and_details:
+ elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details
+ else:
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+
+ look_for_expected_old_value = False
+ current_old_value = not_found
+ try:
+ if action == GET:
+ current_old_value = obj[elem] # type: ignore
+ elif action == GETATTR:
+ current_old_value = getattr(obj, elem)
+ look_for_expected_old_value = current_old_value != expected_old_value
+ except (KeyError, IndexError, AttributeError, TypeError):
+ look_for_expected_old_value = True
+
+ if look_for_expected_old_value and isinstance(obj, list) and not self._iterable_compare_func_was_used:
+ # It may return None if it doesn't find it
+ elem = self._find_closest_iterable_element_for_index(obj, elem, expected_old_value)
+ if elem is not None:
+ current_old_value = expected_old_value
+ if current_old_value is not_found or elem is None:
+ continue
+
+ self._del_elem(parent, parent_to_obj_elem, parent_to_obj_action,
+ obj, elements, path, elem, action)
+ self._do_verify_changes(path, expected_old_value, current_old_value)
+
+ def _find_closest_iterable_element_for_index(self, obj, elem, expected_old_value):
+ closest_elem = None
+ closest_distance = float('inf')
+ for index, value in enumerate(obj):
+ dist = abs(index - elem)
+ if dist > closest_distance:
+ break
+ if value == expected_old_value and dist < closest_distance:
+ closest_elem = index
+ closest_distance = dist
+ return closest_elem
+
+ def _do_iterable_opcodes(self):
+ _iterable_opcodes = self.diff.get('_iterable_opcodes', {})
+ if _iterable_opcodes:
+ for path, opcodes in _iterable_opcodes.items():
+ transformed = []
+ # elements = _path_to_elements(path)
+ elem_and_details = self._get_elements_and_details(path)
+ if elem_and_details:
+ elements, parent, parent_to_obj_elem, parent_to_obj_action, obj, elem, action = elem_and_details
+ if parent is None:
+ parent = self
+ obj = self.root
+ parent_to_obj_elem = 'root'
+ parent_to_obj_action = GETATTR
+ else:
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+ # import pytest; pytest.set_trace()
+ obj = self.get_nested_obj(obj=self, elements=elements)
+ is_obj_tuple = isinstance(obj, tuple)
+ for opcode in opcodes:
+ if opcode.tag == 'replace':
+ # Replace items in list a[i1:i2] with b[j1:j2]
+ transformed.extend(opcode.new_values)
+ elif opcode.tag == 'delete':
+ # Delete items from list a[i1:i2], so we do nothing here
+ continue
+ elif opcode.tag == 'insert':
+ # Insert items from list b[j1:j2] into the new list
+ transformed.extend(opcode.new_values)
+ elif opcode.tag == 'equal':
+ # Items are the same in both lists, so we add them to the result
+ transformed.extend(obj[opcode.t1_from_index:opcode.t1_to_index]) # type: ignore
+ if is_obj_tuple:
+ obj = tuple(obj) # type: ignore
+ # Making sure that the object is re-instated inside the parent especially if it was immutable
+ # and we had to turn it into a mutable one. In such cases the object has a new id.
+ self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem,
+ value=obj, action=parent_to_obj_action)
+ else:
+ obj[:] = transformed # type: ignore
+
+
+
+ # obj = self.get_nested_obj(obj=self, elements=elements)
+ # for
+
+
+ def _do_iterable_item_removed(self):
+ iterable_item_removed = self.diff.get('iterable_item_removed', {})
+
+ iterable_item_moved = self.diff.get('iterable_item_moved')
+ if iterable_item_moved:
+ # These will get added back during items_added
+ removed_dict = {k: v["value"] for k, v in iterable_item_moved.items()}
+ iterable_item_removed.update(removed_dict)
+
+ if iterable_item_removed:
+ self._do_item_removed(iterable_item_removed)
+
+ def _do_dictionary_item_removed(self):
+ dictionary_item_removed = self.diff.get('dictionary_item_removed')
+ if dictionary_item_removed:
+ self._do_item_removed(dictionary_item_removed)
+
+ def _do_attribute_removed(self):
+ attribute_removed = self.diff.get('attribute_removed')
+ if attribute_removed:
+ self._do_item_removed(attribute_removed)
+
+ def _do_set_item_added(self):
+ items = self.diff.get('set_item_added')
+ if items:
+ self._do_set_or_frozenset_item(items, func='union')
+
+ def _do_set_item_removed(self):
+ items = self.diff.get('set_item_removed')
+ if items:
+ self._do_set_or_frozenset_item(items, func='difference')
+
+ def _do_set_or_frozenset_item(self, items, func):
+ for path, value in items.items():
+ elements = _path_to_elements(path)
+ parent = self.get_nested_obj(obj=self, elements=elements[:-1])
+ elem, action = elements[-1]
+ obj = self._get_elem_and_compare_to_old_value(
+ parent, path_for_err_reporting=path, expected_old_value=None, elem=elem, action=action, forced_old_value=set())
+ new_value = getattr(obj, func)(value)
+ self._simple_set_elem_value(parent, path_for_err_reporting=path, elem=elem, value=new_value, action=action)
+
+ def _do_ignore_order_get_old(self, obj, remove_indexes_per_path, fixed_indexes_values, path_for_err_reporting):
+ """
+ A generator that gets the old values in an iterable when the order was supposed to be ignored.
+ """
+ old_obj_index = -1
+ max_len = len(obj) - 1
+ while old_obj_index < max_len:
+ old_obj_index += 1
+ current_old_obj = obj[old_obj_index]
+ if current_old_obj in fixed_indexes_values:
+ continue
+ if old_obj_index in remove_indexes_per_path:
+ expected_obj_to_delete = remove_indexes_per_path.pop(old_obj_index)
+ if current_old_obj == expected_obj_to_delete:
+ continue
+ else:
+ self._raise_or_log(FAIL_TO_REMOVE_ITEM_IGNORE_ORDER_MSG.format(
+ old_obj_index, path_for_err_reporting, expected_obj_to_delete, current_old_obj))
+ yield current_old_obj
+
+ def _do_ignore_order(self):
+ """
+
+ 't1': [5, 1, 1, 1, 6],
+ 't2': [7, 1, 1, 1, 8],
+
+ 'iterable_items_added_at_indexes': {
+ 'root': {
+ 0: 7,
+ 4: 8
+ }
+ },
+ 'iterable_items_removed_at_indexes': {
+ 'root': {
+ 4: 6,
+ 0: 5
+ }
+ }
+
+ """
+ fixed_indexes = self.diff.get('iterable_items_added_at_indexes', dict_())
+ remove_indexes = self.diff.get('iterable_items_removed_at_indexes', dict_())
+ paths = SetOrdered(fixed_indexes.keys()) | SetOrdered(remove_indexes.keys())
+ for path in paths: # type: ignore
+ # In the case of ignore_order reports, we are pointing to the container object.
+ # Thus we add a [0] to the elements so we can get the required objects and discard what we don't need.
+ elem_and_details = self._get_elements_and_details("{}[0]".format(path))
+ if elem_and_details:
+ _, parent, parent_to_obj_elem, parent_to_obj_action, obj, _, _ = elem_and_details
+ else:
+ continue # pragma: no cover. Due to cPython peephole optimizer, this line doesn't get covered. https://github.com/nedbat/coveragepy/issues/198
+ # copying both these dictionaries since we don't want to mutate them.
+ fixed_indexes_per_path = fixed_indexes.get(path, dict_()).copy()
+ remove_indexes_per_path = remove_indexes.get(path, dict_()).copy()
+ fixed_indexes_values = AnySet(fixed_indexes_per_path.values())
+
+ new_obj = []
+ # Numpy's NdArray does not like the bool function.
+ if isinstance(obj, np_ndarray):
+ there_are_old_items = obj.size > 0
+ else:
+ there_are_old_items = bool(obj)
+ old_item_gen = self._do_ignore_order_get_old(
+ obj, remove_indexes_per_path, fixed_indexes_values, path_for_err_reporting=path)
+ while there_are_old_items or fixed_indexes_per_path:
+ new_obj_index = len(new_obj)
+ if new_obj_index in fixed_indexes_per_path:
+ new_item = fixed_indexes_per_path.pop(new_obj_index)
+ new_obj.append(new_item)
+ elif there_are_old_items:
+ try:
+ new_item = next(old_item_gen)
+ except StopIteration:
+ there_are_old_items = False
+ else:
+ new_obj.append(new_item)
+ else:
+ # pop a random item from the fixed_indexes_per_path dictionary
+ self._raise_or_log(INDEXES_NOT_FOUND_WHEN_IGNORE_ORDER.format(fixed_indexes_per_path))
+ new_item = fixed_indexes_per_path.pop(next(iter(fixed_indexes_per_path)))
+ new_obj.append(new_item)
+
+ if isinstance(obj, tuple):
+ new_obj = tuple(new_obj)
+ # Making sure that the object is re-instated inside the parent especially if it was immutable
+ # and we had to turn it into a mutable one. In such cases the object has a new id.
+ self._simple_set_elem_value(obj=parent, path_for_err_reporting=path, elem=parent_to_obj_elem,
+ value=new_obj, action=parent_to_obj_action)
+
+ def _get_reverse_diff(self):
+ if not self.bidirectional:
+ raise ValueError('Please recreate the delta with bidirectional=True')
+
+ SIMPLE_ACTION_TO_REVERSE = {
+ 'iterable_item_added': 'iterable_item_removed',
+ 'iterable_items_added_at_indexes': 'iterable_items_removed_at_indexes',
+ 'attribute_added': 'attribute_removed',
+ 'set_item_added': 'set_item_removed',
+ 'dictionary_item_added': 'dictionary_item_removed',
+ }
+ # Adding the reverse of the dictionary
+ for key in list(SIMPLE_ACTION_TO_REVERSE.keys()):
+ SIMPLE_ACTION_TO_REVERSE[SIMPLE_ACTION_TO_REVERSE[key]] = key
+
+ r_diff = {}
+ for action, info in self.diff.items():
+ reverse_action = SIMPLE_ACTION_TO_REVERSE.get(action)
+ if reverse_action:
+ r_diff[reverse_action] = info
+ elif action == 'values_changed':
+ r_diff[action] = {}
+ for path, path_info in info.items():
+ reverse_path = path_info['new_path'] if path_info.get('new_path') else path
+ r_diff[action][reverse_path] = {
+ 'new_value': path_info['old_value'], 'old_value': path_info['new_value']
+ }
+ elif action == 'type_changes':
+ r_diff[action] = {}
+ for path, path_info in info.items():
+ reverse_path = path_info['new_path'] if path_info.get('new_path') else path
+ r_diff[action][reverse_path] = {
+ 'old_type': path_info['new_type'], 'new_type': path_info['old_type'],
+ }
+ if 'new_value' in path_info:
+ r_diff[action][reverse_path]['old_value'] = path_info['new_value']
+ if 'old_value' in path_info:
+ r_diff[action][reverse_path]['new_value'] = path_info['old_value']
+ elif action == 'iterable_item_moved':
+ r_diff[action] = {}
+ for path, path_info in info.items():
+ old_path = path_info['new_path']
+ r_diff[action][old_path] = {
+ 'new_path': path, 'value': path_info['value'],
+ }
+ elif action == '_iterable_opcodes':
+ r_diff[action] = {}
+ for path, op_codes in info.items():
+ r_diff[action][path] = []
+ for op_code in op_codes:
+ tag = op_code.tag
+ tag = {'delete': 'insert', 'insert': 'delete'}.get(tag, tag)
+ new_op_code = Opcode(
+ tag=tag,
+ t1_from_index=op_code.t2_from_index,
+ t1_to_index=op_code.t2_to_index,
+ t2_from_index=op_code.t1_from_index,
+ t2_to_index=op_code.t1_to_index,
+ new_values=op_code.old_values,
+ old_values=op_code.new_values,
+ )
+ r_diff[action][path].append(new_op_code)
+ return r_diff
+
+ def dump(self, file):
+ """
+ Dump into file object
+ """
+ # Small optimization: Our internal pickle serializer can just take a file object
+ # and directly write to it. However if a user defined serializer is passed
+ # we want to make it compatible with the expectation that self.serializer(self.diff)
+ # will give the user the serialization and then it can be written to
+ # a file object when using the dump(file) function.
+ param_names_of_serializer = set(self.serializer.__code__.co_varnames)
+ if 'file_obj' in param_names_of_serializer:
+ self.serializer(self.diff, file_obj=file)
+ else:
+ file.write(self.dumps())
+
+ def dumps(self):
+ """
+ Return the serialized representation of the object as a bytes object, instead of writing it to a file.
+ """
+ return self.serializer(self.diff)
+
+ def to_dict(self):
+ return dict(self.diff)
+
+ def _flatten_iterable_opcodes(self, _parse_path):
+ """
+ Converts op_codes to FlatDeltaRows
+ """
+ result = []
+ for path, op_codes in self.diff['_iterable_opcodes'].items():
+ for op_code in op_codes:
+ result.append(
+ FlatDeltaRow(
+ path=_parse_path(path),
+ action=OPCODE_TAG_TO_FLAT_DATA_ACTION[op_code.tag],
+ value=op_code.new_values,
+ old_value=op_code.old_values,
+ type=type(op_code.new_values),
+ old_type=type(op_code.old_values),
+ new_path=None,
+ t1_from_index=op_code.t1_from_index,
+ t1_to_index=op_code.t1_to_index,
+ t2_from_index=op_code.t2_from_index,
+ t2_to_index=op_code.t2_to_index,
+
+ )
+ )
+ return result
+
+ @staticmethod
+ def _get_flat_row(action, info, _parse_path, keys_and_funcs, report_type_changes=True):
+ for path, details in info.items():
+ row = {'path': _parse_path(path), 'action': action}
+ for key, new_key, func in keys_and_funcs:
+ if key in details:
+ if func:
+ row[new_key] = func(details[key])
+ else:
+ row[new_key] = details[key]
+ if report_type_changes:
+ if 'value' in row and 'type' not in row:
+ row['type'] = type(row['value'])
+ if 'old_value' in row and 'old_type' not in row:
+ row['old_type'] = type(row['old_value'])
+ yield FlatDeltaRow(**row)
+
+ @staticmethod
+ def _from_flat_rows(flat_rows_list: List[FlatDeltaRow]):
+ flat_dict_list = (i._asdict() for i in flat_rows_list)
+ return Delta._from_flat_dicts(flat_dict_list)
+
+ @staticmethod
+ def _from_flat_dicts(flat_dict_list):
+ """
+ Create the delta's diff object from the flat_dict_list
+ """
+ result = {}
+ FLATTENING_NEW_ACTION_MAP = {
+ 'unordered_iterable_item_added': 'iterable_items_added_at_indexes',
+ 'unordered_iterable_item_removed': 'iterable_items_removed_at_indexes',
+ }
+ for flat_dict in flat_dict_list:
+ index = None
+ action = flat_dict.get("action")
+ path = flat_dict.get("path")
+ value = flat_dict.get('value')
+ new_path = flat_dict.get('new_path')
+ old_value = flat_dict.get('old_value', UnkownValueCode)
+ if not action:
+ raise ValueError("Flat dict need to include the 'action'.")
+ if path is None:
+ raise ValueError("Flat dict need to include the 'path'.")
+ if action in FLATTENING_NEW_ACTION_MAP:
+ action = FLATTENING_NEW_ACTION_MAP[action]
+ index = path.pop()
+ if action in {
+ FlatDataAction.attribute_added,
+ FlatDataAction.attribute_removed,
+ }:
+ root_element = ('root', GETATTR)
+ else:
+ root_element = ('root', GET)
+ if isinstance(path, str):
+ path_str = path
+ else:
+ path_str = stringify_path(path, root_element=root_element) # We need the string path
+ if new_path and new_path != path:
+ new_path = stringify_path(new_path, root_element=root_element)
+ else:
+ new_path = None
+ if action not in result:
+ result[action] = {}
+ if action in {
+ 'iterable_items_added_at_indexes',
+ 'iterable_items_removed_at_indexes',
+ }:
+ if path_str not in result[action]:
+ result[action][path_str] = {}
+ result[action][path_str][index] = value
+ elif action in {
+ FlatDataAction.set_item_added,
+ FlatDataAction.set_item_removed
+ }:
+ if path_str not in result[action]:
+ result[action][path_str] = set()
+ result[action][path_str].add(value)
+ elif action in {
+ FlatDataAction.dictionary_item_added,
+ FlatDataAction.dictionary_item_removed,
+ FlatDataAction.attribute_removed,
+ FlatDataAction.attribute_added,
+ FlatDataAction.iterable_item_added,
+ FlatDataAction.iterable_item_removed,
+ }:
+ result[action][path_str] = value
+ elif action == 'values_changed':
+ if old_value == UnkownValueCode:
+ result[action][path_str] = {'new_value': value}
+ else:
+ result[action][path_str] = {'new_value': value, 'old_value': old_value}
+ elif action == 'type_changes':
+ type_ = flat_dict.get('type', UnkownValueCode)
+ old_type = flat_dict.get('old_type', UnkownValueCode)
+
+ result[action][path_str] = {'new_value': value}
+ for elem, elem_value in [
+ ('new_type', type_),
+ ('old_type', old_type),
+ ('old_value', old_value),
+ ]:
+ if elem_value != UnkownValueCode:
+ result[action][path_str][elem] = elem_value
+ elif action == FlatDataAction.iterable_item_moved:
+ result[action][path_str] = {'value': value}
+ elif action in {
+ FlatDataAction.iterable_items_inserted,
+ FlatDataAction.iterable_items_deleted,
+ FlatDataAction.iterable_items_replaced,
+ FlatDataAction.iterable_items_equal,
+ }:
+ if '_iterable_opcodes' not in result:
+ result['_iterable_opcodes'] = {}
+ if path_str not in result['_iterable_opcodes']:
+ result['_iterable_opcodes'][path_str] = []
+ result['_iterable_opcodes'][path_str].append(
+ Opcode(
+ tag=FLAT_DATA_ACTION_TO_OPCODE_TAG[action], # type: ignore
+ t1_from_index=flat_dict.get('t1_from_index'),
+ t1_to_index=flat_dict.get('t1_to_index'),
+ t2_from_index=flat_dict.get('t2_from_index'),
+ t2_to_index=flat_dict.get('t2_to_index'),
+ new_values=flat_dict.get('value'),
+ old_values=flat_dict.get('old_value'),
+ )
+ )
+ if new_path:
+ result[action][path_str]['new_path'] = new_path
+
+ return result
+
+ def to_flat_dicts(self, include_action_in_path=False, report_type_changes=True) -> List[FlatDeltaRow]:
+ """
+ Returns a flat list of actions that is easily machine readable.
+
+ For example:
+ {'iterable_item_added': {'root[3]': 5, 'root[2]': 3}}
+
+ Becomes:
+ [
+ {'path': [3], 'value': 5, 'action': 'iterable_item_added'},
+ {'path': [2], 'value': 3, 'action': 'iterable_item_added'},
+ ]
+
+
+ **Parameters**
+
+ include_action_in_path : Boolean, default=False
+ When False, we translate DeepDiff's paths like root[3].attribute1 into a [3, 'attribute1'].
+ When True, we include the action to retrieve the item in the path: [(3, 'GET'), ('attribute1', 'GETATTR')]
+ Note that the "action" here is the different than the action reported by to_flat_dicts. The action here is just about the "path" output.
+
+ report_type_changes : Boolean, default=True
+ If False, we don't report the type change. Instead we report the value change.
+
+ Example:
+ t1 = {"a": None}
+ t2 = {"a": 1}
+
+ dump = Delta(DeepDiff(t1, t2)).dumps()
+ delta = Delta(dump)
+ assert t2 == delta + t1
+
+ flat_result = delta.to_flat_dicts()
+ flat_expected = [{'path': ['a'], 'action': 'type_changes', 'value': 1, 'new_type': int, 'old_type': type(None)}]
+ assert flat_expected == flat_result
+
+ flat_result2 = delta.to_flat_dicts(report_type_changes=False)
+ flat_expected2 = [{'path': ['a'], 'action': 'values_changed', 'value': 1}]
+
+ **List of actions**
+
+ Here are the list of actions that the flat dictionary can return.
+ iterable_item_added
+ iterable_item_removed
+ iterable_item_moved
+ values_changed
+ type_changes
+ set_item_added
+ set_item_removed
+ dictionary_item_added
+ dictionary_item_removed
+ attribute_added
+ attribute_removed
+ """
+ return [
+ i._asdict() for i in self.to_flat_rows(include_action_in_path=False, report_type_changes=True)
+ ] # type: ignore
+
+ def to_flat_rows(self, include_action_in_path=False, report_type_changes=True) -> List[FlatDeltaRow]:
+ """
+ Just like to_flat_dicts but returns FlatDeltaRow Named Tuples
+ """
+ result = []
+ if include_action_in_path:
+ _parse_path = partial(parse_path, include_actions=True)
+ else:
+ _parse_path = parse_path
+ if report_type_changes:
+ keys_and_funcs = [
+ ('value', 'value', None),
+ ('new_value', 'value', None),
+ ('old_value', 'old_value', None),
+ ('new_type', 'type', None),
+ ('old_type', 'old_type', None),
+ ('new_path', 'new_path', _parse_path),
+ ]
+ else:
+ if not self.always_include_values:
+ raise ValueError(
+ "When converting to flat dictionaries, if report_type_changes=False and there are type changes, "
+ "you must set the always_include_values=True at the delta object creation. Otherwise there is nothing to include."
+ )
+ keys_and_funcs = [
+ ('value', 'value', None),
+ ('new_value', 'value', None),
+ ('old_value', 'old_value', None),
+ ('new_path', 'new_path', _parse_path),
+ ]
+
+ FLATTENING_NEW_ACTION_MAP = {
+ 'iterable_items_added_at_indexes': 'unordered_iterable_item_added',
+ 'iterable_items_removed_at_indexes': 'unordered_iterable_item_removed',
+ }
+ for action, info in self.diff.items():
+ if action == '_iterable_opcodes':
+ result.extend(self._flatten_iterable_opcodes(_parse_path=_parse_path))
+ continue
+ if action.startswith('_'):
+ continue
+ if action in FLATTENING_NEW_ACTION_MAP:
+ new_action = FLATTENING_NEW_ACTION_MAP[action]
+ for path, index_to_value in info.items():
+ path = _parse_path(path)
+ for index, value in index_to_value.items():
+ path2 = path.copy()
+ if include_action_in_path:
+ path2.append((index, 'GET')) # type: ignore
+ else:
+ path2.append(index)
+ if report_type_changes:
+ row = FlatDeltaRow(path=path2, value=value, action=new_action, type=type(value)) # type: ignore
+ else:
+ row = FlatDeltaRow(path=path2, value=value, action=new_action) # type: ignore
+ result.append(row)
+ elif action in {'set_item_added', 'set_item_removed'}:
+ for path, values in info.items():
+ path = _parse_path(path)
+ for value in values:
+ if report_type_changes:
+ row = FlatDeltaRow(path=path, value=value, action=action, type=type(value))
+ else:
+ row = FlatDeltaRow(path=path, value=value, action=action)
+ result.append(row)
+ elif action == 'dictionary_item_added':
+ for path, value in info.items():
+ path = _parse_path(path)
+ if isinstance(value, dict) and len(value) == 1:
+ new_key = next(iter(value))
+ path.append(new_key)
+ value = value[new_key]
+ elif isinstance(value, (list, tuple)) and len(value) == 1:
+ value = value[0]
+ path.append(0) # type: ignore
+ action = 'iterable_item_added'
+ elif isinstance(value, set) and len(value) == 1:
+ value = value.pop()
+ action = 'set_item_added'
+ if report_type_changes:
+ row = FlatDeltaRow(path=path, value=value, action=action, type=type(value)) # type: ignore
+ else:
+ row = FlatDeltaRow(path=path, value=value, action=action) # type: ignore
+ result.append(row)
+ elif action in {
+ 'dictionary_item_removed', 'iterable_item_added',
+ 'iterable_item_removed', 'attribute_removed', 'attribute_added'
+ }:
+ for path, value in info.items():
+ path = _parse_path(path)
+ if report_type_changes:
+ row = FlatDeltaRow(path=path, value=value, action=action, type=type(value))
+ else:
+ row = FlatDeltaRow(path=path, value=value, action=action)
+ result.append(row)
+ elif action == 'type_changes':
+ if not report_type_changes:
+ action = 'values_changed'
+
+ for row in self._get_flat_row(
+ action=action,
+ info=info,
+ _parse_path=_parse_path,
+ keys_and_funcs=keys_and_funcs,
+ report_type_changes=report_type_changes,
+ ):
+ result.append(row)
+ else:
+ for row in self._get_flat_row(
+ action=action,
+ info=info,
+ _parse_path=_parse_path,
+ keys_and_funcs=keys_and_funcs,
+ report_type_changes=report_type_changes,
+ ):
+ result.append(row)
+ return result
+
+
+if __name__ == "__main__": # pragma: no cover
+ import doctest
+ doctest.testmod()
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/diff.py b/.venv/lib/python3.12/site-packages/deepdiff/diff.py
new file mode 100644
index 00000000..d84ecc7e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/diff.py
@@ -0,0 +1,1906 @@
+#!/usr/bin/env python
+
+# In order to run the docstrings:
+# python3 -m deepdiff.diff
+# You might need to run it many times since dictionaries come in different orders
+# every time you run the docstrings.
+# However the docstring expects it in a specific order in order to pass!
+import difflib
+import logging
+import types
+import datetime
+from enum import Enum
+from copy import deepcopy
+from math import isclose as is_close
+from typing import List, Dict, Callable, Union, Any, Pattern, Tuple, Optional, Set, FrozenSet, TYPE_CHECKING, Protocol
+from collections.abc import Mapping, Iterable, Sequence
+from collections import defaultdict
+from inspect import getmembers
+from itertools import zip_longest
+from functools import lru_cache
+from deepdiff.helper import (strings, bytes_type, numbers, uuids, ListItemRemovedOrAdded, notpresent,
+ IndexedHash, unprocessed, add_to_frozen_set, basic_types,
+ convert_item_or_items_into_set_else_none, get_type,
+ convert_item_or_items_into_compiled_regexes_else_none,
+ type_is_subclass_of_type_group, type_in_type_group, get_doc,
+ number_to_string, datetime_normalize, KEY_TO_VAL_STR, booleans,
+ np_ndarray, np_floating, get_numpy_ndarray_rows, RepeatedTimer,
+ TEXT_VIEW, TREE_VIEW, DELTA_VIEW, detailed__dict__, add_root_to_paths,
+ np, get_truncate_datetime, dict_, CannotCompare, ENUM_INCLUDE_KEYS,
+ PydanticBaseModel, Opcode, SetOrdered, ipranges)
+from deepdiff.serialization import SerializationMixin
+from deepdiff.distance import DistanceMixin, logarithmic_similarity
+from deepdiff.model import (
+ RemapDict, ResultDict, TextResult, TreeResult, DiffLevel,
+ DictRelationship, AttributeRelationship, REPORT_KEYS,
+ SubscriptableIterableRelationship, NonSubscriptableIterableRelationship,
+ SetRelationship, NumpyArrayRelationship, CUSTOM_FIELD,
+ FORCE_DEFAULT,
+)
+from deepdiff.deephash import DeepHash, combine_hashes_lists
+from deepdiff.base import Base
+from deepdiff.lfucache import LFUCache, DummyLFU
+
+if TYPE_CHECKING:
+ from pytz.tzinfo import BaseTzInfo
+
+
+logger = logging.getLogger(__name__)
+
+MAX_PASSES_REACHED_MSG = (
+ 'DeepDiff has reached the max number of passes of {}. '
+ 'You can possibly get more accurate results by increasing the max_passes parameter.')
+
+MAX_DIFFS_REACHED_MSG = (
+ 'DeepDiff has reached the max number of diffs of {}. '
+ 'You can possibly get more accurate results by increasing the max_diffs parameter.')
+
+
+notpresent_indexed = IndexedHash(indexes=[0], item=notpresent)
+
+doc = get_doc('diff_doc.rst')
+
+
+PROGRESS_MSG = "DeepDiff {} seconds in progress. Pass #{}, Diff #{}"
+
+
+def _report_progress(_stats, progress_logger, duration):
+ """
+ Report the progress every few seconds.
+ """
+ progress_logger(PROGRESS_MSG.format(duration, _stats[PASSES_COUNT], _stats[DIFF_COUNT]))
+
+
+DISTANCE_CACHE_HIT_COUNT = 'DISTANCE CACHE HIT COUNT'
+DIFF_COUNT = 'DIFF COUNT'
+PASSES_COUNT = 'PASSES COUNT'
+MAX_PASS_LIMIT_REACHED = 'MAX PASS LIMIT REACHED'
+MAX_DIFF_LIMIT_REACHED = 'MAX DIFF LIMIT REACHED'
+DISTANCE_CACHE_ENABLED = 'DISTANCE CACHE ENABLED'
+PREVIOUS_DIFF_COUNT = 'PREVIOUS DIFF COUNT'
+PREVIOUS_DISTANCE_CACHE_HIT_COUNT = 'PREVIOUS DISTANCE CACHE HIT COUNT'
+CANT_FIND_NUMPY_MSG = 'Unable to import numpy. This must be a bug in DeepDiff since a numpy array is detected.'
+INVALID_VIEW_MSG = 'The only valid values for the view parameter are text and tree. But {} was passed.'
+CUTOFF_RANGE_ERROR_MSG = 'cutoff_distance_for_pairs needs to be a positive float max 1.'
+VERBOSE_LEVEL_RANGE_MSG = 'verbose_level should be 0, 1, or 2.'
+PURGE_LEVEL_RANGE_MSG = 'cache_purge_level should be 0, 1, or 2.'
+_ENABLE_CACHE_EVERY_X_DIFF = '_ENABLE_CACHE_EVERY_X_DIFF'
+
+model_fields_set = frozenset(["model_fields_set"])
+
+
+# What is the threshold to consider 2 items to be pairs. Only used when ignore_order = True.
+CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT = 0.3
+
+# What is the threshold to calculate pairs of items between 2 iterables.
+# For example 2 iterables that have nothing in common, do not need their pairs to be calculated.
+CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT = 0.7
+
+DEEPHASH_PARAM_KEYS = (
+ 'exclude_types',
+ 'exclude_paths',
+ 'include_paths',
+ 'exclude_regex_paths',
+ 'hasher',
+ 'significant_digits',
+ 'number_format_notation',
+ 'ignore_string_type_changes',
+ 'ignore_numeric_type_changes',
+ 'use_enum_value',
+ 'ignore_type_in_groups',
+ 'ignore_type_subclasses',
+ 'ignore_string_case',
+ 'exclude_obj_callback',
+ 'ignore_private_variables',
+ 'encodings',
+ 'ignore_encoding_errors',
+ 'default_timezone',
+ 'custom_operators',
+)
+
+
+class DeepDiffProtocol(Protocol):
+ t1: Any
+ t2: Any
+ cutoff_distance_for_pairs: float
+ use_log_scale: bool
+ log_scale_similarity_threshold: float
+ view: str
+
+
+
+class DeepDiff(ResultDict, SerializationMixin, DistanceMixin, DeepDiffProtocol, Base):
+ __doc__ = doc
+
+ CACHE_AUTO_ADJUST_THRESHOLD = 0.25
+
+ def __init__(self,
+ t1: Any,
+ t2: Any,
+ _original_type=None,
+ cache_purge_level: int=1,
+ cache_size: int=0,
+ cache_tuning_sample_size: int=0,
+ custom_operators: Optional[List[Any]] =None,
+ cutoff_distance_for_pairs: float=CUTOFF_DISTANCE_FOR_PAIRS_DEFAULT,
+ cutoff_intersection_for_pairs: float=CUTOFF_INTERSECTION_FOR_PAIRS_DEFAULT,
+ default_timezone:Union[datetime.timezone, "BaseTzInfo"]=datetime.timezone.utc,
+ encodings: Optional[List[str]]=None,
+ exclude_obj_callback: Optional[Callable]=None,
+ exclude_obj_callback_strict: Optional[Callable]=None,
+ exclude_paths: Union[str, List[str], Set[str], FrozenSet[str], None]=None,
+ exclude_regex_paths: Union[str, List[str], Pattern[str], List[Pattern[str]], None]=None,
+ exclude_types: Optional[List[Any]]=None,
+ get_deep_distance: bool=False,
+ group_by: Union[str, Tuple[str, str], None]=None,
+ group_by_sort_key: Union[str, Callable, None]=None,
+ hasher: Optional[Callable]=None,
+ hashes: Optional[Dict]=None,
+ ignore_encoding_errors: bool=False,
+ ignore_nan_inequality: bool=False,
+ ignore_numeric_type_changes: bool=False,
+ ignore_order: bool=False,
+ ignore_order_func: Optional[Callable]=None,
+ ignore_private_variables: bool=True,
+ ignore_string_case: bool=False,
+ ignore_string_type_changes: bool=False,
+ ignore_type_in_groups: Optional[List[Tuple]]=None,
+ ignore_type_subclasses: bool=False,
+ include_obj_callback: Optional[Callable]=None,
+ include_obj_callback_strict: Optional[Callable]=None,
+ include_paths: Union[str, List[str], None]=None,
+ iterable_compare_func: Optional[Callable]=None,
+ log_frequency_in_sec: int=0,
+ log_scale_similarity_threshold: float=0.1,
+ log_stacktrace: bool=False,
+ math_epsilon: Optional[float]=None,
+ max_diffs: Optional[int]=None,
+ max_passes: int=10000000,
+ number_format_notation: str="f",
+ number_to_string_func: Optional[Callable]=None,
+ progress_logger: Callable=logger.info,
+ report_repetition: bool=False,
+ significant_digits: Optional[int]=None,
+ threshold_to_diff_deeper: float = 0.33,
+ truncate_datetime: Optional[str]=None,
+ use_enum_value: bool=False,
+ use_log_scale: bool=False,
+ verbose_level: int=1,
+ view: str=TEXT_VIEW,
+ zip_ordered_iterables: bool=False,
+ _parameters=None,
+ _shared_parameters=None,
+ **kwargs):
+ super().__init__()
+ if kwargs:
+ raise ValueError((
+ "The following parameter(s) are not valid: %s\n"
+ "The valid parameters are ignore_order, report_repetition, significant_digits, "
+ "number_format_notation, exclude_paths, include_paths, exclude_types, exclude_regex_paths, ignore_type_in_groups, "
+ "ignore_string_type_changes, ignore_numeric_type_changes, ignore_type_subclasses, truncate_datetime, "
+ "ignore_private_variables, ignore_nan_inequality, number_to_string_func, verbose_level, "
+ "view, hasher, hashes, max_passes, max_diffs, zip_ordered_iterables, "
+ "cutoff_distance_for_pairs, cutoff_intersection_for_pairs, log_frequency_in_sec, cache_size, "
+ "cache_tuning_sample_size, get_deep_distance, group_by, group_by_sort_key, cache_purge_level, log_stacktrace,"
+ "math_epsilon, iterable_compare_func, use_enum_value, _original_type, threshold_to_diff_deeper, default_timezone "
+ "ignore_order_func, custom_operators, encodings, ignore_encoding_errors, use_log_scale, log_scale_similarity_threshold "
+ "_parameters and _shared_parameters.") % ', '.join(kwargs.keys()))
+
+ if _parameters:
+ self.__dict__.update(_parameters)
+ else:
+ self.custom_operators = custom_operators or []
+ self.ignore_order = ignore_order
+
+ self.ignore_order_func = ignore_order_func
+
+ ignore_type_in_groups = ignore_type_in_groups or []
+ if numbers == ignore_type_in_groups or numbers in ignore_type_in_groups:
+ ignore_numeric_type_changes = True
+ self.ignore_numeric_type_changes = ignore_numeric_type_changes
+ if strings == ignore_type_in_groups or strings in ignore_type_in_groups:
+ ignore_string_type_changes = True
+ self.use_enum_value = use_enum_value
+ self.log_scale_similarity_threshold = log_scale_similarity_threshold
+ self.use_log_scale = use_log_scale
+ self.default_timezone = default_timezone
+ self.log_stacktrace = log_stacktrace
+ self.threshold_to_diff_deeper = threshold_to_diff_deeper
+ self.ignore_string_type_changes = ignore_string_type_changes
+ self.ignore_type_in_groups = self.get_ignore_types_in_groups(
+ ignore_type_in_groups=ignore_type_in_groups,
+ ignore_string_type_changes=ignore_string_type_changes,
+ ignore_numeric_type_changes=ignore_numeric_type_changes,
+ ignore_type_subclasses=ignore_type_subclasses)
+ self.report_repetition = report_repetition
+ self.exclude_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(exclude_paths))
+ self.include_paths = add_root_to_paths(convert_item_or_items_into_set_else_none(include_paths))
+ self.exclude_regex_paths = convert_item_or_items_into_compiled_regexes_else_none(exclude_regex_paths)
+ self.exclude_types = set(exclude_types) if exclude_types else None
+ self.exclude_types_tuple = tuple(exclude_types) if exclude_types else None # we need tuple for checking isinstance
+ self.ignore_type_subclasses = ignore_type_subclasses
+ self.type_check_func = type_in_type_group if ignore_type_subclasses else type_is_subclass_of_type_group
+ self.ignore_string_case = ignore_string_case
+ self.exclude_obj_callback = exclude_obj_callback
+ self.exclude_obj_callback_strict = exclude_obj_callback_strict
+ self.include_obj_callback = include_obj_callback
+ self.include_obj_callback_strict = include_obj_callback_strict
+ self.number_to_string = number_to_string_func or number_to_string
+ self.iterable_compare_func = iterable_compare_func
+ self.zip_ordered_iterables = zip_ordered_iterables
+ self.ignore_private_variables = ignore_private_variables
+ self.ignore_nan_inequality = ignore_nan_inequality
+ self.hasher = hasher
+ self.cache_tuning_sample_size = cache_tuning_sample_size
+ self.group_by = group_by
+ if callable(group_by_sort_key):
+ self.group_by_sort_key = group_by_sort_key
+ elif group_by_sort_key:
+ def _group_by_sort_key(x):
+ return x[group_by_sort_key]
+ self.group_by_sort_key = _group_by_sort_key
+ else:
+ self.group_by_sort_key = None
+ self.encodings = encodings
+ self.ignore_encoding_errors = ignore_encoding_errors
+
+ self.significant_digits = self.get_significant_digits(significant_digits, ignore_numeric_type_changes)
+ self.math_epsilon = math_epsilon
+ if self.math_epsilon is not None and self.ignore_order:
+ logger.warning("math_epsilon in conjunction with ignore_order=True is only used for flat object comparisons. Custom math_epsilon will not have an effect when comparing nested objects.")
+ self.truncate_datetime = get_truncate_datetime(truncate_datetime)
+ self.number_format_notation = number_format_notation
+ if verbose_level in {0, 1, 2}:
+ self.verbose_level = verbose_level
+ else:
+ raise ValueError(VERBOSE_LEVEL_RANGE_MSG)
+ if cache_purge_level not in {0, 1, 2}:
+ raise ValueError(PURGE_LEVEL_RANGE_MSG)
+ self.view = view
+ # Setting up the cache for dynamic programming. One dictionary per instance of root of DeepDiff running.
+ self.max_passes = max_passes
+ self.max_diffs = max_diffs
+ self.cutoff_distance_for_pairs = float(cutoff_distance_for_pairs)
+ self.cutoff_intersection_for_pairs = float(cutoff_intersection_for_pairs)
+ if self.cutoff_distance_for_pairs < 0 or self.cutoff_distance_for_pairs > 1:
+ raise ValueError(CUTOFF_RANGE_ERROR_MSG)
+ # _Parameters are the clean _parameters to initialize DeepDiff with so we avoid all the above
+ # cleaning functionalities when running DeepDiff recursively.
+ # However DeepHash has its own set of _parameters that are slightly different than DeepDIff.
+ # DeepDiff _parameters are transformed to DeepHash _parameters via _get_deephash_params method.
+ self.progress_logger = progress_logger
+ self.cache_size = cache_size
+ _parameters = self.__dict__.copy()
+ _parameters['group_by'] = None # overwriting since these parameters will be passed on to other passes.
+ if log_stacktrace:
+ self.log_err = logger.exception
+ else:
+ self.log_err = logger.error
+
+ # Non-Root
+ if _shared_parameters:
+ self.is_root = False
+ self._shared_parameters = _shared_parameters
+ self.__dict__.update(_shared_parameters)
+ # We are in some pass other than root
+ progress_timer = None
+ # Root
+ else:
+ self.is_root = True
+ # Caching the DeepDiff results for dynamic programming
+ self._distance_cache = LFUCache(cache_size) if cache_size else DummyLFU()
+ self._stats = {
+ PASSES_COUNT: 0,
+ DIFF_COUNT: 0,
+ DISTANCE_CACHE_HIT_COUNT: 0,
+ PREVIOUS_DIFF_COUNT: 0,
+ PREVIOUS_DISTANCE_CACHE_HIT_COUNT: 0,
+ MAX_PASS_LIMIT_REACHED: False,
+ MAX_DIFF_LIMIT_REACHED: False,
+ DISTANCE_CACHE_ENABLED: bool(cache_size),
+ }
+ self.hashes = dict_() if hashes is None else hashes
+ self._numpy_paths = dict_() # if _numpy_paths is None else _numpy_paths
+ self._shared_parameters = {
+ 'hashes': self.hashes,
+ '_stats': self._stats,
+ '_distance_cache': self._distance_cache,
+ '_numpy_paths': self._numpy_paths,
+ _ENABLE_CACHE_EVERY_X_DIFF: self.cache_tuning_sample_size * 10,
+ }
+ if log_frequency_in_sec:
+ # Creating a progress log reporter that runs in a separate thread every log_frequency_in_sec seconds.
+ progress_timer = RepeatedTimer(log_frequency_in_sec, _report_progress, self._stats, progress_logger)
+ else:
+ progress_timer = None
+
+ self._parameters = _parameters
+ self.deephash_parameters = self._get_deephash_params()
+ self.tree = TreeResult()
+ self._iterable_opcodes = {}
+ if group_by and self.is_root:
+ try:
+ original_t1 = t1
+ t1 = self._group_iterable_to_dict(t1, group_by, item_name='t1')
+ except (KeyError, ValueError):
+ pass
+ else:
+ try:
+ t2 = self._group_iterable_to_dict(t2, group_by, item_name='t2')
+ except (KeyError, ValueError):
+ t1 = original_t1
+
+ self.t1 = t1
+ self.t2 = t2
+
+ try:
+ root = DiffLevel(t1, t2, verbose_level=self.verbose_level)
+ # _original_type is only used to pass the original type of the data. Currently only used for numpy arrays.
+ # The reason is that we convert the numpy array to python list and then later for distance calculations
+ # we convert only the the last dimension of it into numpy arrays.
+ self._diff(root, parents_ids=frozenset({id(t1)}), _original_type=_original_type)
+
+ if get_deep_distance and view in {TEXT_VIEW, TREE_VIEW}:
+ self.tree['deep_distance'] = self._get_rough_distance()
+
+ self.tree.remove_empty_keys()
+ view_results = self._get_view_results(self.view)
+ self.update(view_results)
+ finally:
+ if self.is_root:
+ if cache_purge_level:
+ del self._distance_cache
+ del self.hashes
+ del self._shared_parameters
+ del self._parameters
+ for key in (PREVIOUS_DIFF_COUNT, PREVIOUS_DISTANCE_CACHE_HIT_COUNT,
+ DISTANCE_CACHE_ENABLED):
+ del self._stats[key]
+ if progress_timer:
+ duration = progress_timer.stop()
+ self._stats['DURATION SEC'] = duration
+ logger.info('stats {}'.format(self.get_stats()))
+ if cache_purge_level == 2:
+ self.__dict__.clear()
+
+ def _get_deephash_params(self):
+ result = {key: self._parameters[key] for key in DEEPHASH_PARAM_KEYS}
+ result['ignore_repetition'] = not self.report_repetition
+ result['number_to_string_func'] = self.number_to_string
+ return result
+
+ def _report_result(self, report_type, change_level, local_tree=None):
+ """
+ Add a detected change to the reference-style result dictionary.
+ report_type will be added to level.
+ (We'll create the text-style report from there later.)
+ :param report_type: A well defined string key describing the type of change.
+ Examples: "set_item_added", "values_changed"
+ :param change_level: A DiffLevel object describing the objects in question in their
+ before-change and after-change object structure.
+
+ :local_tree: None
+ """
+
+ if not self._skip_this(change_level):
+ change_level.report_type = report_type
+ tree = self.tree if local_tree is None else local_tree
+ tree[report_type].add(change_level)
+
+ def custom_report_result(self, report_type, level, extra_info=None):
+ """
+ Add a detected change to the reference-style result dictionary.
+ report_type will be added to level.
+ (We'll create the text-style report from there later.)
+ :param report_type: A well defined string key describing the type of change.
+ Examples: "set_item_added", "values_changed"
+ :param parent: A DiffLevel object describing the objects in question in their
+ before-change and after-change object structure.
+ :param extra_info: A dict that describe this result
+ :rtype: None
+ """
+
+ if not self._skip_this(level):
+ level.report_type = report_type
+ level.additional[CUSTOM_FIELD] = extra_info
+ self.tree[report_type].add(level)
+
+ @staticmethod
+ def _dict_from_slots(object):
+ def unmangle(attribute):
+ if attribute.startswith('__') and attribute != '__weakref__':
+ return '_{type}{attribute}'.format(
+ type=type(object).__name__,
+ attribute=attribute
+ )
+ return attribute
+
+ all_slots = []
+
+ if isinstance(object, type):
+ mro = object.__mro__ # pragma: no cover. I have not been able to write a test for this case. But we still check for it.
+ else:
+ mro = object.__class__.__mro__
+
+ for type_in_mro in mro:
+ slots = getattr(type_in_mro, '__slots__', None)
+ if slots:
+ if isinstance(slots, strings):
+ all_slots.append(slots)
+ else:
+ all_slots.extend(slots)
+
+ return {i: getattr(object, key) for i in all_slots if hasattr(object, key := unmangle(i))}
+
+ def _diff_enum(self, level, parents_ids=frozenset(), local_tree=None):
+ t1 = detailed__dict__(level.t1, include_keys=ENUM_INCLUDE_KEYS)
+ t2 = detailed__dict__(level.t2, include_keys=ENUM_INCLUDE_KEYS)
+
+ self._diff_dict(
+ level,
+ parents_ids,
+ print_as_attribute=True,
+ override=True,
+ override_t1=t1,
+ override_t2=t2,
+ local_tree=local_tree,
+ )
+
+ def _diff_obj(self, level, parents_ids=frozenset(), is_namedtuple=False, local_tree=None, is_pydantic_object=False):
+ """Difference of 2 objects"""
+ processing_error = False
+ try:
+ if is_namedtuple:
+ t1 = level.t1._asdict()
+ t2 = level.t2._asdict()
+ elif is_pydantic_object:
+ t1 = detailed__dict__(level.t1, ignore_private_variables=self.ignore_private_variables, ignore_keys=model_fields_set)
+ t2 = detailed__dict__(level.t2, ignore_private_variables=self.ignore_private_variables, ignore_keys=model_fields_set)
+ elif all('__dict__' in dir(t) for t in level):
+ t1 = detailed__dict__(level.t1, ignore_private_variables=self.ignore_private_variables)
+ t2 = detailed__dict__(level.t2, ignore_private_variables=self.ignore_private_variables)
+ elif all('__slots__' in dir(t) for t in level):
+ t1 = self._dict_from_slots(level.t1)
+ t2 = self._dict_from_slots(level.t2)
+ else:
+ t1 = {k: v for k, v in getmembers(level.t1) if not callable(v)}
+ t2 = {k: v for k, v in getmembers(level.t2) if not callable(v)}
+ except AttributeError:
+ processing_error = True
+ if processing_error is True:
+ self._report_result('unprocessed', level, local_tree=local_tree)
+ return
+
+ self._diff_dict(
+ level,
+ parents_ids,
+ print_as_attribute=True,
+ override=True,
+ override_t1=t1,
+ override_t2=t2,
+ local_tree=local_tree,
+ )
+
+ def _skip_this(self, level):
+ """
+ Check whether this comparison should be skipped because one of the objects to compare meets exclusion criteria.
+ :rtype: bool
+ """
+ level_path = level.path()
+ skip = False
+ if self.exclude_paths and level_path in self.exclude_paths:
+ skip = True
+ if self.include_paths and level_path != 'root':
+ if level_path not in self.include_paths:
+ skip = True
+ for prefix in self.include_paths:
+ if prefix in level_path or level_path in prefix:
+ skip = False
+ break
+ elif self.exclude_regex_paths and any(
+ [exclude_regex_path.search(level_path) for exclude_regex_path in self.exclude_regex_paths]):
+ skip = True
+ elif self.exclude_types_tuple and \
+ (isinstance(level.t1, self.exclude_types_tuple) or isinstance(level.t2, self.exclude_types_tuple)):
+ skip = True
+ elif self.exclude_obj_callback and \
+ (self.exclude_obj_callback(level.t1, level_path) or self.exclude_obj_callback(level.t2, level_path)):
+ skip = True
+ elif self.exclude_obj_callback_strict and \
+ (self.exclude_obj_callback_strict(level.t1, level_path) and
+ self.exclude_obj_callback_strict(level.t2, level_path)):
+ skip = True
+ elif self.include_obj_callback and level_path != 'root':
+ skip = True
+ if (self.include_obj_callback(level.t1, level_path) or self.include_obj_callback(level.t2, level_path)):
+ skip = False
+ elif self.include_obj_callback_strict and level_path != 'root':
+ skip = True
+ if (self.include_obj_callback_strict(level.t1, level_path) and
+ self.include_obj_callback_strict(level.t2, level_path)):
+ skip = False
+
+ return skip
+
+ def _skip_this_key(self, level, key):
+ # if include_paths is not set, than treet every path as included
+ if self.include_paths is None:
+ return False
+ if "{}['{}']".format(level.path(), key) in self.include_paths:
+ return False
+ if level.path() in self.include_paths:
+ # matches e.g. level+key root['foo']['bar']['veg'] include_paths ["root['foo']['bar']"]
+ return False
+ for prefix in self.include_paths:
+ if "{}['{}']".format(level.path(), key) in prefix:
+ # matches as long the prefix is longer than this object key
+ # eg.: level+key root['foo']['bar'] matches prefix root['foo']['bar'] from include paths
+ # level+key root['foo'] matches prefix root['foo']['bar'] from include_paths
+ # level+key root['foo']['bar'] DOES NOT match root['foo'] from include_paths This needs to be handled afterwards
+ return False
+ # check if a higher level is included as a whole (=without any sublevels specified)
+ # matches e.g. level+key root['foo']['bar']['veg'] include_paths ["root['foo']"]
+ # but does not match, if it is level+key root['foo']['bar']['veg'] include_paths ["root['foo']['bar']['fruits']"]
+ up = level.up
+ while up is not None:
+ if up.path() in self.include_paths:
+ return False
+ up = up.up
+ return True
+
+ def _get_clean_to_keys_mapping(self, keys, level):
+ """
+ Get a dictionary of cleaned value of keys to the keys themselves.
+ This is mainly used to transform the keys when the type changes of keys should be ignored.
+
+ TODO: needs also some key conversion for groups of types other than the built-in strings and numbers.
+ """
+ result = dict_()
+ for key in keys:
+ if self.ignore_string_type_changes and isinstance(key, bytes):
+ clean_key = key.decode('utf-8')
+ elif self.use_enum_value and isinstance(key, Enum):
+ clean_key = key.value
+ elif isinstance(key, numbers):
+ type_ = "number" if self.ignore_numeric_type_changes else key.__class__.__name__
+ clean_key = self.number_to_string(key, significant_digits=self.significant_digits,
+ number_format_notation=self.number_format_notation)
+ clean_key = KEY_TO_VAL_STR.format(type_, clean_key)
+ else:
+ clean_key = key
+ if self.ignore_string_case and isinstance(clean_key, str):
+ clean_key = clean_key.lower()
+ if clean_key in result:
+ logger.warning(('{} and {} in {} become the same key when ignore_numeric_type_changes'
+ 'or ignore_numeric_type_changes are set to be true.').format(
+ key, result[clean_key], level.path()))
+ else:
+ result[clean_key] = key
+ return result
+
+ def _diff_dict(
+ self,
+ level,
+ parents_ids=frozenset([]),
+ print_as_attribute=False,
+ override=False,
+ override_t1=None,
+ override_t2=None,
+ local_tree=None,
+ ):
+ """Difference of 2 dictionaries"""
+ if override:
+ # for special stuff like custom objects and named tuples we receive preprocessed t1 and t2
+ # but must not spoil the chain (=level) with it
+ t1 = override_t1
+ t2 = override_t2
+ else:
+ t1 = level.t1
+ t2 = level.t2
+
+ if print_as_attribute:
+ item_added_key = "attribute_added"
+ item_removed_key = "attribute_removed"
+ rel_class = AttributeRelationship
+ else:
+ item_added_key = "dictionary_item_added"
+ item_removed_key = "dictionary_item_removed"
+ rel_class = DictRelationship
+
+ if self.ignore_private_variables:
+ t1_keys = SetOrdered([key for key in t1 if not(isinstance(key, str) and key.startswith('__')) and not self._skip_this_key(level, key)])
+ t2_keys = SetOrdered([key for key in t2 if not(isinstance(key, str) and key.startswith('__')) and not self._skip_this_key(level, key)])
+ else:
+ t1_keys = SetOrdered([key for key in t1 if not self._skip_this_key(level, key)])
+ t2_keys = SetOrdered([key for key in t2 if not self._skip_this_key(level, key)])
+ if self.ignore_string_type_changes or self.ignore_numeric_type_changes or self.ignore_string_case:
+ t1_clean_to_keys = self._get_clean_to_keys_mapping(keys=t1_keys, level=level)
+ t2_clean_to_keys = self._get_clean_to_keys_mapping(keys=t2_keys, level=level)
+ t1_keys = SetOrdered(t1_clean_to_keys.keys())
+ t2_keys = SetOrdered(t2_clean_to_keys.keys())
+ else:
+ t1_clean_to_keys = t2_clean_to_keys = None
+
+ t_keys_intersect = t2_keys & t1_keys
+ t_keys_added = t2_keys - t_keys_intersect
+ t_keys_removed = t1_keys - t_keys_intersect
+
+ if self.threshold_to_diff_deeper:
+ if self.exclude_paths:
+ t_keys_union = {f"{level.path()}[{repr(key)}]" for key in (t2_keys | t1_keys)}
+ t_keys_union -= self.exclude_paths
+ t_keys_union_len = len(t_keys_union)
+ else:
+ t_keys_union_len = len(t2_keys | t1_keys)
+ if t_keys_union_len > 1 and len(t_keys_intersect) / t_keys_union_len < self.threshold_to_diff_deeper:
+ self._report_result('values_changed', level, local_tree=local_tree)
+ return
+
+ for key in t_keys_added:
+ if self._count_diff() is StopIteration:
+ return
+
+ key = t2_clean_to_keys[key] if t2_clean_to_keys else key
+ change_level = level.branch_deeper(
+ notpresent,
+ t2[key],
+ child_relationship_class=rel_class,
+ child_relationship_param=key,
+ child_relationship_param2=key,
+ )
+ self._report_result(item_added_key, change_level, local_tree=local_tree)
+
+ for key in t_keys_removed:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+
+ key = t1_clean_to_keys[key] if t1_clean_to_keys else key
+ change_level = level.branch_deeper(
+ t1[key],
+ notpresent,
+ child_relationship_class=rel_class,
+ child_relationship_param=key,
+ child_relationship_param2=key,
+ )
+ self._report_result(item_removed_key, change_level, local_tree=local_tree)
+
+ for key in t_keys_intersect: # key present in both dicts - need to compare values
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+
+ key1 = t1_clean_to_keys[key] if t1_clean_to_keys else key
+ key2 = t2_clean_to_keys[key] if t2_clean_to_keys else key
+ item_id = id(t1[key1])
+ if parents_ids and item_id in parents_ids:
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+
+ # Go one level deeper
+ next_level = level.branch_deeper(
+ t1[key1],
+ t2[key2],
+ child_relationship_class=rel_class,
+ child_relationship_param=key,
+ child_relationship_param2=key,
+ )
+ self._diff(next_level, parents_ids_added, local_tree=local_tree)
+
+ def _diff_set(self, level, local_tree=None):
+ """Difference of sets"""
+ t1_hashtable = self._create_hashtable(level, 't1')
+ t2_hashtable = self._create_hashtable(level, 't2')
+
+ t1_hashes = set(t1_hashtable.keys())
+ t2_hashes = set(t2_hashtable.keys())
+
+ hashes_added = t2_hashes - t1_hashes
+ hashes_removed = t1_hashes - t2_hashes
+
+ items_added = [t2_hashtable[i].item for i in hashes_added]
+ items_removed = [t1_hashtable[i].item for i in hashes_removed]
+
+ for item in items_added:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+
+ change_level = level.branch_deeper(
+ notpresent, item, child_relationship_class=SetRelationship)
+ self._report_result('set_item_added', change_level, local_tree=local_tree)
+
+ for item in items_removed:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+
+ change_level = level.branch_deeper(
+ item, notpresent, child_relationship_class=SetRelationship)
+ self._report_result('set_item_removed', change_level, local_tree=local_tree)
+
+ @staticmethod
+ def _iterables_subscriptable(t1, t2):
+ try:
+ if getattr(t1, '__getitem__') and getattr(t2, '__getitem__'):
+ return True
+ else: # pragma: no cover
+ return False # should never happen
+ except AttributeError:
+ return False
+
+ def _diff_iterable(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None):
+ """Difference of iterables"""
+ if (self.ignore_order_func and self.ignore_order_func(level)) or self.ignore_order:
+ self._diff_iterable_with_deephash(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+ else:
+ self._diff_iterable_in_order(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+
+ def _compare_in_order(
+ self, level,
+ t1_from_index=None, t1_to_index=None,
+ t2_from_index=None, t2_to_index=None
+ ) -> List[Tuple[Tuple[int, int], Tuple[Any, Any]]]:
+ """
+ Default compare if `iterable_compare_func` is not provided.
+ This will compare in sequence order.
+ """
+ if t1_from_index is None:
+ return [((i, i), (x, y)) for i, (x, y) in enumerate(
+ zip_longest(
+ level.t1, level.t2, fillvalue=ListItemRemovedOrAdded))]
+ else:
+ t1_chunk = level.t1[t1_from_index:t1_to_index]
+ t2_chunk = level.t2[t2_from_index:t2_to_index]
+ return [((i + t1_from_index, i + t2_from_index), (x, y)) for i, (x, y) in enumerate(
+ zip_longest(
+ t1_chunk, t2_chunk, fillvalue=ListItemRemovedOrAdded))]
+
+ def _get_matching_pairs(
+ self, level,
+ t1_from_index=None, t1_to_index=None,
+ t2_from_index=None, t2_to_index=None
+ ) -> List[Tuple[Tuple[int, int], Tuple[Any, Any]]]:
+ """
+ Given a level get matching pairs. This returns list of two tuples in the form:
+ [
+ (t1 index, t2 index), (t1 item, t2 item)
+ ]
+
+ This will compare using the passed in `iterable_compare_func` if available.
+ Default it to compare in order
+ """
+
+ if self.iterable_compare_func is None:
+ # Match in order if there is no compare function provided
+ return self._compare_in_order(
+ level,
+ t1_from_index=t1_from_index, t1_to_index=t1_to_index,
+ t2_from_index=t2_from_index, t2_to_index=t2_to_index,
+ )
+ try:
+ matches = []
+ y_matched = set()
+ y_index_matched = set()
+ for i, x in enumerate(level.t1):
+ x_found = False
+ for j, y in enumerate(level.t2):
+
+ if(j in y_index_matched):
+ # This ensures a one-to-one relationship of matches from t1 to t2.
+ # If y this index in t2 has already been matched to another x
+ # it cannot have another match, so just continue.
+ continue
+
+ if(self.iterable_compare_func(x, y, level)):
+ deep_hash = DeepHash(y,
+ hashes=self.hashes,
+ apply_hash=True,
+ **self.deephash_parameters,
+ )
+ y_index_matched.add(j)
+ y_matched.add(deep_hash[y])
+ matches.append(((i, j), (x, y)))
+ x_found = True
+ break
+
+ if(not x_found):
+ matches.append(((i, -1), (x, ListItemRemovedOrAdded)))
+ for j, y in enumerate(level.t2):
+
+ deep_hash = DeepHash(y,
+ hashes=self.hashes,
+ apply_hash=True,
+ **self.deephash_parameters,
+ )
+ if(deep_hash[y] not in y_matched):
+ matches.append(((-1, j), (ListItemRemovedOrAdded, y)))
+ return matches
+ except CannotCompare:
+ return self._compare_in_order(
+ level,
+ t1_from_index=t1_from_index, t1_to_index=t1_to_index,
+ t2_from_index=t2_from_index, t2_to_index=t2_to_index
+ )
+
+ def _diff_iterable_in_order(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None):
+ # We're handling both subscriptable and non-subscriptable iterables. Which one is it?
+ subscriptable = self._iterables_subscriptable(level.t1, level.t2)
+ if subscriptable:
+ child_relationship_class = SubscriptableIterableRelationship
+ else:
+ child_relationship_class = NonSubscriptableIterableRelationship
+
+ if (
+ not self.zip_ordered_iterables
+ and isinstance(level.t1, Sequence)
+ and isinstance(level.t2, Sequence)
+ and self._all_values_basic_hashable(level.t1)
+ and self._all_values_basic_hashable(level.t2)
+ and self.iterable_compare_func is None
+ ):
+ local_tree_pass = TreeResult()
+ opcodes_with_values = self._diff_ordered_iterable_by_difflib(
+ level,
+ parents_ids=parents_ids,
+ _original_type=_original_type,
+ child_relationship_class=child_relationship_class,
+ local_tree=local_tree_pass,
+ )
+ # Sometimes DeepDiff's old iterable diff does a better job than DeepDiff
+ if len(local_tree_pass) > 1:
+ local_tree_pass2 = TreeResult()
+ self._diff_by_forming_pairs_and_comparing_one_by_one(
+ level,
+ parents_ids=parents_ids,
+ _original_type=_original_type,
+ child_relationship_class=child_relationship_class,
+ local_tree=local_tree_pass2,
+ )
+ if len(local_tree_pass) >= len(local_tree_pass2):
+ local_tree_pass = local_tree_pass2
+ else:
+ self._iterable_opcodes[level.path(force=FORCE_DEFAULT)] = opcodes_with_values
+ for report_type, levels in local_tree_pass.items():
+ if levels:
+ self.tree[report_type] |= levels
+ else:
+ self._diff_by_forming_pairs_and_comparing_one_by_one(
+ level,
+ parents_ids=parents_ids,
+ _original_type=_original_type,
+ child_relationship_class=child_relationship_class,
+ local_tree=local_tree,
+ )
+
+ def _all_values_basic_hashable(self, iterable):
+ """
+ Are all items basic hashable types?
+ Or there are custom types too?
+ """
+
+ # We don't want to exhaust a generator
+ if isinstance(iterable, types.GeneratorType):
+ return False
+ for item in iterable:
+ if not isinstance(item, basic_types):
+ return False
+ return True
+
+ def _diff_by_forming_pairs_and_comparing_one_by_one(
+ self, level, local_tree, parents_ids=frozenset(),
+ _original_type=None, child_relationship_class=None,
+ t1_from_index=None, t1_to_index=None,
+ t2_from_index=None, t2_to_index=None,
+ ):
+ for (i, j), (x, y) in self._get_matching_pairs(
+ level,
+ t1_from_index=t1_from_index, t1_to_index=t1_to_index,
+ t2_from_index=t2_from_index, t2_to_index=t2_to_index
+ ):
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+
+ reference_param1 = i
+ reference_param2 = j
+ if y is ListItemRemovedOrAdded: # item removed completely
+ change_level = level.branch_deeper(
+ x,
+ notpresent,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=reference_param1,
+ child_relationship_param2=reference_param2,
+ )
+ self._report_result('iterable_item_removed', change_level, local_tree=local_tree)
+
+ elif x is ListItemRemovedOrAdded: # new item added
+ change_level = level.branch_deeper(
+ notpresent,
+ y,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=reference_param1,
+ child_relationship_param2=reference_param2,
+ )
+ self._report_result('iterable_item_added', change_level, local_tree=local_tree)
+
+ else: # check if item value has changed
+ if (i != j and ((x == y) or self.iterable_compare_func)):
+ # Item moved
+ change_level = level.branch_deeper(
+ x,
+ y,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=reference_param1,
+ child_relationship_param2=reference_param2
+ )
+ self._report_result('iterable_item_moved', change_level, local_tree=local_tree)
+
+ if self.iterable_compare_func:
+ # Intentionally setting j as the first child relationship param in cases of a moved item.
+ # If the item was moved using an iterable_compare_func then we want to make sure that the index
+ # is relative to t2.
+ reference_param1 = j
+ reference_param2 = i
+ else:
+ continue
+
+ item_id = id(x)
+ if parents_ids and item_id in parents_ids:
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+
+ # Go one level deeper
+ next_level = level.branch_deeper(
+ x,
+ y,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=reference_param1,
+ child_relationship_param2=reference_param2
+ )
+ self._diff(next_level, parents_ids_added, local_tree=local_tree)
+
+ def _diff_ordered_iterable_by_difflib(
+ self, level, local_tree, parents_ids=frozenset(), _original_type=None, child_relationship_class=None,
+ ):
+
+ seq = difflib.SequenceMatcher(isjunk=None, a=level.t1, b=level.t2, autojunk=False)
+
+ opcodes = seq.get_opcodes()
+ opcodes_with_values = []
+
+ # TODO: this logic should be revisted so we detect reverse operations
+ # like when a replacement happens at index X and a reverse replacement happens at index Y
+ # in those cases we have a "iterable_item_moved" operation.
+ for tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index in opcodes:
+ if tag == 'equal':
+ opcodes_with_values.append(Opcode(
+ tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index,
+ ))
+ continue
+ # print('{:7} t1[{}:{}] --> t2[{}:{}] {!r:>8} --> {!r}'.format(
+ # tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index, level.t1[t1_from_index:t1_to_index], level.t2[t2_from_index:t2_to_index]))
+
+ opcodes_with_values.append(Opcode(
+ tag, t1_from_index, t1_to_index, t2_from_index, t2_to_index,
+ old_values = level.t1[t1_from_index: t1_to_index],
+ new_values = level.t2[t2_from_index: t2_to_index],
+ ))
+
+ if tag == 'replace':
+ self._diff_by_forming_pairs_and_comparing_one_by_one(
+ level, local_tree=local_tree, parents_ids=parents_ids,
+ _original_type=_original_type, child_relationship_class=child_relationship_class,
+ t1_from_index=t1_from_index, t1_to_index=t1_to_index,
+ t2_from_index=t2_from_index, t2_to_index=t2_to_index,
+ )
+ elif tag == 'delete':
+ for index, x in enumerate(level.t1[t1_from_index:t1_to_index]):
+ change_level = level.branch_deeper(
+ x,
+ notpresent,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=index + t1_from_index,
+ child_relationship_param2=index + t1_from_index,
+ )
+ self._report_result('iterable_item_removed', change_level, local_tree=local_tree)
+ elif tag == 'insert':
+ for index, y in enumerate(level.t2[t2_from_index:t2_to_index]):
+ change_level = level.branch_deeper(
+ notpresent,
+ y,
+ child_relationship_class=child_relationship_class,
+ child_relationship_param=index + t2_from_index,
+ child_relationship_param2=index + t2_from_index,
+ )
+ self._report_result('iterable_item_added', change_level, local_tree=local_tree)
+ return opcodes_with_values
+
+
+ def _diff_str(self, level, local_tree=None):
+ """Compare strings"""
+ if self.ignore_string_case:
+ level.t1 = level.t1.lower()
+ level.t2 = level.t2.lower()
+
+ if type(level.t1) == type(level.t2) and level.t1 == level.t2: # NOQA
+ return
+
+ # do we add a diff for convenience?
+ do_diff = True
+ t1_str = level.t1
+ t2_str = level.t2
+
+ if isinstance(level.t1, bytes_type):
+ try:
+ t1_str = level.t1.decode('ascii')
+ except UnicodeDecodeError:
+ do_diff = False
+
+ if isinstance(level.t2, bytes_type):
+ try:
+ t2_str = level.t2.decode('ascii')
+ except UnicodeDecodeError:
+ do_diff = False
+
+ if isinstance(level.t1, Enum):
+ t1_str = level.t1.value
+
+ if isinstance(level.t2, Enum):
+ t2_str = level.t2.value
+
+ if t1_str == t2_str:
+ return
+
+ if do_diff:
+ if '\n' in t1_str or isinstance(t2_str, str) and '\n' in t2_str:
+ diff = difflib.unified_diff(
+ t1_str.splitlines(), t2_str.splitlines(), lineterm='')
+ diff = list(diff)
+ if diff:
+ level.additional['diff'] = '\n'.join(diff)
+
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_tuple(self, level, parents_ids, local_tree=None):
+ # Checking to see if it has _fields. Which probably means it is a named
+ # tuple.
+ try:
+ level.t1._asdict
+ # It must be a normal tuple
+ except AttributeError:
+ self._diff_iterable(level, parents_ids, local_tree=local_tree)
+ # We assume it is a namedtuple then
+ else:
+ self._diff_obj(level, parents_ids, is_namedtuple=True, local_tree=local_tree)
+
+ def _add_hash(self, hashes, item_hash, item, i):
+ if item_hash in hashes:
+ hashes[item_hash].indexes.append(i)
+ else:
+ hashes[item_hash] = IndexedHash(indexes=[i], item=item)
+
+ def _create_hashtable(self, level, t):
+ """Create hashtable of {item_hash: (indexes, item)}"""
+ obj = getattr(level, t)
+
+ local_hashes = dict_()
+ for (i, item) in enumerate(obj):
+ try:
+ parent = "{}[{}]".format(level.path(), i)
+ # Note: in the DeepDiff we only calculate the hash of items when we have to.
+ # So self.hashes does not include hashes of all objects in t1 and t2.
+ # It only includes the ones needed when comparing iterables.
+ # The self.hashes dictionary gets shared between different runs of DeepHash
+ # So that any object that is already calculated to have a hash is not re-calculated.
+ deep_hash = DeepHash(
+ item,
+ hashes=self.hashes,
+ parent=parent,
+ apply_hash=True,
+ **self.deephash_parameters,
+ )
+ except UnicodeDecodeError as err:
+ err.reason = f"Can not produce a hash for {level.path()}: {err.reason}"
+ raise
+ except NotImplementedError:
+ raise
+ # except Exception as e: # pragma: no cover
+ # logger.error("Can not produce a hash for %s."
+ # "Not counting this object.\n %s" %
+ # (level.path(), e))
+ else:
+ try:
+ item_hash = deep_hash[item]
+ except KeyError:
+ pass
+ else:
+ if item_hash is unprocessed: # pragma: no cover
+ self.log_err("Item %s was not processed while hashing "
+ "thus not counting this object." %
+ level.path())
+ else:
+ self._add_hash(hashes=local_hashes, item_hash=item_hash, item=item, i=i)
+
+ # Also we hash the iterables themselves too so that we can later create cache keys from those hashes.
+ DeepHash(
+ obj,
+ hashes=self.hashes,
+ parent=level.path(),
+ apply_hash=True,
+ **self.deephash_parameters,
+ )
+ return local_hashes
+
+ @staticmethod
+ @lru_cache(maxsize=2028)
+ def _get_distance_cache_key(added_hash, removed_hash):
+ key1, key2 = (added_hash, removed_hash) if added_hash > removed_hash else (removed_hash, added_hash)
+ if isinstance(key1, int):
+ # If the hash function produces integers we convert them to hex values.
+ # This was used when the default hash function was Murmur3 128bit which produces integers.
+ key1 = hex(key1).encode('utf-8')
+ key2 = hex(key2).encode('utf-8')
+ elif isinstance(key1, str):
+ key1 = key1.encode('utf-8')
+ key2 = key2.encode('utf-8')
+ return key1 + b'--' + key2 + b'dc'
+
+ def _get_rough_distance_of_hashed_objs(
+ self, added_hash, removed_hash, added_hash_obj, removed_hash_obj, _original_type=None):
+ # We need the rough distance between the 2 objects to see if they qualify to be pairs or not
+ _distance = cache_key = None
+ if self._stats[DISTANCE_CACHE_ENABLED]:
+ cache_key = self._get_distance_cache_key(added_hash, removed_hash)
+ if cache_key in self._distance_cache:
+ self._stats[DISTANCE_CACHE_HIT_COUNT] += 1
+ _distance = self._distance_cache.get(cache_key)
+ if _distance is None:
+ # We can only cache the rough distance and not the actual diff result for reuse.
+ # The reason is that we have modified the parameters explicitly so they are different and can't
+ # be used for diff reporting
+ diff = DeepDiff(
+ removed_hash_obj.item, added_hash_obj.item,
+ _parameters=self._parameters,
+ _shared_parameters=self._shared_parameters,
+ view=DELTA_VIEW,
+ _original_type=_original_type,
+ iterable_compare_func=self.iterable_compare_func,
+ )
+ _distance = diff._get_rough_distance()
+ if cache_key and self._stats[DISTANCE_CACHE_ENABLED]:
+ self._distance_cache.set(cache_key, value=_distance)
+ return _distance
+
+ def _get_most_in_common_pairs_in_iterables(
+ self, hashes_added, hashes_removed, t1_hashtable, t2_hashtable, parents_ids, _original_type):
+ """
+ Get the closest pairs between items that are removed and items that are added.
+
+ returns a dictionary of hashes that are closest to each other.
+ The dictionary is going to be symmetrical so any key will be a value too and otherwise.
+
+ Note that due to the current reporting structure in DeepDiff, we don't compare an item that
+ was added to an item that is in both t1 and t2.
+
+ For example
+
+ [{1, 2}, {4, 5, 6}]
+ [{1, 2}, {1, 2, 3}]
+
+ is only compared between {4, 5, 6} and {1, 2, 3} even though technically {1, 2, 3} is
+ just one item different than {1, 2}
+
+ Perhaps in future we can have a report key that is item duplicated and modified instead of just added.
+ """
+ cache_key = None
+ if self._stats[DISTANCE_CACHE_ENABLED]:
+ cache_key = combine_hashes_lists(items=[hashes_added, hashes_removed], prefix='pairs_cache')
+ if cache_key in self._distance_cache:
+ return self._distance_cache.get(cache_key).copy()
+
+ # A dictionary of hashes to distances and each distance to an ordered set of hashes.
+ # It tells us about the distance of each object from other objects.
+ # And the objects with the same distances are grouped together in an ordered set.
+ # It also includes a "max" key that is just the value of the biggest current distance in the
+ # most_in_common_pairs dictionary.
+ def defaultdict_orderedset():
+ return defaultdict(SetOrdered)
+ most_in_common_pairs = defaultdict(defaultdict_orderedset)
+ pairs = dict_()
+
+ pre_calced_distances = None
+ if hashes_added and hashes_removed and np and len(hashes_added) > 1 and len(hashes_removed) > 1:
+ # pre-calculates distances ONLY for 1D arrays whether an _original_type
+ # was explicitly passed or a homogeneous array is detected.
+ # Numpy is needed for this optimization.
+ pre_calced_distances = self._precalculate_numpy_arrays_distance(
+ hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type)
+
+ if hashes_added and hashes_removed \
+ and self.iterable_compare_func \
+ and len(hashes_added) > 0 and len(hashes_removed) > 0:
+ pre_calced_distances = self._precalculate_distance_by_custom_compare_func(
+ hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type)
+
+ for added_hash in hashes_added:
+ for removed_hash in hashes_removed:
+ added_hash_obj = t2_hashtable[added_hash]
+ removed_hash_obj = t1_hashtable[removed_hash]
+
+ # Loop is detected
+ if id(removed_hash_obj.item) in parents_ids:
+ continue
+
+ _distance = None
+ if pre_calced_distances:
+ _distance = pre_calced_distances.get("{}--{}".format(added_hash, removed_hash))
+ if _distance is None:
+ _distance = self._get_rough_distance_of_hashed_objs(
+ added_hash, removed_hash, added_hash_obj, removed_hash_obj, _original_type)
+ # Left for future debugging
+ # print(f'{Fore.RED}distance of {added_hash_obj.item} and {removed_hash_obj.item}: {_distance}{Style.RESET_ALL}')
+ # Discard potential pairs that are too far.
+ if _distance >= self.cutoff_distance_for_pairs:
+ continue
+ pairs_of_item = most_in_common_pairs[added_hash]
+ pairs_of_item[_distance].add(removed_hash)
+ used_to_hashes = set()
+
+ distances_to_from_hashes = defaultdict(SetOrdered)
+ for from_hash, distances_to_to_hashes in most_in_common_pairs.items():
+ # del distances_to_to_hashes['max']
+ for dist in distances_to_to_hashes:
+ distances_to_from_hashes[dist].add(from_hash)
+
+ for dist in sorted(distances_to_from_hashes.keys()):
+ from_hashes = distances_to_from_hashes[dist]
+ while from_hashes:
+ from_hash = from_hashes.pop()
+ if from_hash not in used_to_hashes:
+ to_hashes = most_in_common_pairs[from_hash][dist]
+ while to_hashes:
+ to_hash = to_hashes.pop()
+ if to_hash not in used_to_hashes:
+ used_to_hashes.add(from_hash)
+ used_to_hashes.add(to_hash)
+ # Left for future debugging:
+ # print(f'{bcolors.FAIL}Adding {t2_hashtable[from_hash].item} as a pairs of {t1_hashtable[to_hash].item} with distance of {dist}{bcolors.ENDC}')
+ pairs[from_hash] = to_hash
+
+ inverse_pairs = {v: k for k, v in pairs.items()}
+ pairs.update(inverse_pairs)
+ if cache_key and self._stats[DISTANCE_CACHE_ENABLED]:
+ self._distance_cache.set(cache_key, value=pairs)
+ return pairs.copy()
+
+ def _diff_iterable_with_deephash(self, level, parents_ids, _original_type=None, local_tree=None):
+ """Diff of hashable or unhashable iterables. Only used when ignoring the order."""
+
+ full_t1_hashtable = self._create_hashtable(level, 't1')
+ full_t2_hashtable = self._create_hashtable(level, 't2')
+ t1_hashes = SetOrdered(full_t1_hashtable.keys())
+ t2_hashes = SetOrdered(full_t2_hashtable.keys())
+ hashes_added = t2_hashes - t1_hashes
+ hashes_removed = t1_hashes - t2_hashes
+
+ # Deciding whether to calculate pairs or not.
+ if (len(hashes_added) + len(hashes_removed)) / (len(full_t1_hashtable) + len(full_t2_hashtable) + 1) > self.cutoff_intersection_for_pairs:
+ get_pairs = False
+ else:
+ get_pairs = True
+
+ # reduce the size of hashtables
+ if self.report_repetition:
+ t1_hashtable = full_t1_hashtable
+ t2_hashtable = full_t2_hashtable
+ else:
+ t1_hashtable = {k: v for k, v in full_t1_hashtable.items() if k in hashes_removed}
+ t2_hashtable = {k: v for k, v in full_t2_hashtable.items() if k in hashes_added}
+ if self._stats[PASSES_COUNT] < self.max_passes and get_pairs:
+ self._stats[PASSES_COUNT] += 1
+ pairs = self._get_most_in_common_pairs_in_iterables(
+ hashes_added, hashes_removed, t1_hashtable, t2_hashtable, parents_ids, _original_type)
+ elif get_pairs:
+ if not self._stats[MAX_PASS_LIMIT_REACHED]:
+ self._stats[MAX_PASS_LIMIT_REACHED] = True
+ logger.warning(MAX_PASSES_REACHED_MSG.format(self.max_passes))
+ pairs = dict_()
+ else:
+ pairs = dict_()
+
+ def get_other_pair(hash_value, in_t1=True):
+ """
+ Gets the other paired indexed hash item to the hash_value in the pairs dictionary
+ in_t1: are we looking for the other pair in t1 or t2?
+ """
+ if in_t1:
+ hashtable = t1_hashtable
+ the_other_hashes = hashes_removed
+ else:
+ hashtable = t2_hashtable
+ the_other_hashes = hashes_added
+ other = pairs.pop(hash_value, notpresent)
+ if other is notpresent:
+ other = notpresent_indexed
+ else:
+ # The pairs are symmetrical.
+ # removing the other direction of pair
+ # so it does not get used.
+ del pairs[other]
+ the_other_hashes.remove(other)
+ other = hashtable[other]
+ return other
+
+ if self.report_repetition:
+ for hash_value in hashes_added:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition (when report_repetition=False).
+ other = get_other_pair(hash_value)
+ item_id = id(other.item)
+ indexes = t2_hashtable[hash_value].indexes if other.item is notpresent else other.indexes
+ # When we report repetitions, we want the child_relationship_param2 only if there is no repetition.
+ # Because when there is a repetition, we report it in a different way (iterable_items_added_at_indexes for example).
+ # When there is no repetition, we want child_relationship_param2 so that we report the "new_path" correctly.
+ if len(t2_hashtable[hash_value].indexes) == 1:
+ index2 = t2_hashtable[hash_value].indexes[0]
+ else:
+ index2 = None
+ for i in indexes:
+ change_level = level.branch_deeper(
+ other.item,
+ t2_hashtable[hash_value].item,
+ child_relationship_class=SubscriptableIterableRelationship,
+ child_relationship_param=i,
+ child_relationship_param2=index2,
+ )
+ if other.item is notpresent:
+ self._report_result('iterable_item_added', change_level, local_tree=local_tree)
+ else:
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ self._diff(change_level, parents_ids_added, local_tree=local_tree)
+ for hash_value in hashes_removed:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+ other = get_other_pair(hash_value, in_t1=False)
+ item_id = id(other.item)
+ # When we report repetitions, we want the child_relationship_param2 only if there is no repetition.
+ # Because when there is a repetition, we report it in a different way (iterable_items_added_at_indexes for example).
+ # When there is no repetition, we want child_relationship_param2 so that we report the "new_path" correctly.
+ if other.item is notpresent or len(other.indexes > 1):
+ index2 = None
+ else:
+ index2 = other.indexes[0]
+ for i in t1_hashtable[hash_value].indexes:
+ change_level = level.branch_deeper(
+ t1_hashtable[hash_value].item,
+ other.item,
+ child_relationship_class=SubscriptableIterableRelationship,
+ child_relationship_param=i,
+ child_relationship_param2=index2,
+ )
+ if other.item is notpresent:
+ self._report_result('iterable_item_removed', change_level, local_tree=local_tree)
+ else:
+ # I was not able to make a test case for the following 2 lines since the cases end up
+ # getting resolved above in the hashes_added calcs. However I am leaving these 2 lines
+ # in case things change in future.
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id) # pragma: no cover.
+ self._diff(change_level, parents_ids_added, local_tree=local_tree) # pragma: no cover.
+
+ items_intersect = t2_hashes.intersection(t1_hashes)
+
+ for hash_value in items_intersect:
+ t1_indexes = t1_hashtable[hash_value].indexes
+ t2_indexes = t2_hashtable[hash_value].indexes
+ t1_indexes_len = len(t1_indexes)
+ t2_indexes_len = len(t2_indexes)
+ if t1_indexes_len != t2_indexes_len: # this is a repetition change!
+ # create "change" entry, keep current level untouched to handle further changes
+ repetition_change_level = level.branch_deeper(
+ t1_hashtable[hash_value].item,
+ t2_hashtable[hash_value].item, # nb: those are equal!
+ child_relationship_class=SubscriptableIterableRelationship,
+ child_relationship_param=t1_hashtable[hash_value]
+ .indexes[0])
+ repetition_change_level.additional['repetition'] = RemapDict(
+ old_repeat=t1_indexes_len,
+ new_repeat=t2_indexes_len,
+ old_indexes=t1_indexes,
+ new_indexes=t2_indexes)
+ self._report_result('repetition_change',
+ repetition_change_level, local_tree=local_tree)
+
+ else:
+ for hash_value in hashes_added:
+ if self._count_diff() is StopIteration:
+ return
+ other = get_other_pair(hash_value)
+ item_id = id(other.item)
+ index = t2_hashtable[hash_value].indexes[0] if other.item is notpresent else other.indexes[0]
+ index2 = t2_hashtable[hash_value].indexes[0]
+ change_level = level.branch_deeper(
+ other.item,
+ t2_hashtable[hash_value].item,
+ child_relationship_class=SubscriptableIterableRelationship,
+ child_relationship_param=index,
+ child_relationship_param2=index2,
+ )
+ if other.item is notpresent:
+ self._report_result('iterable_item_added', change_level, local_tree=local_tree)
+ else:
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ self._diff(change_level, parents_ids_added, local_tree=local_tree)
+
+ for hash_value in hashes_removed:
+ if self._count_diff() is StopIteration:
+ return # pragma: no cover. This is already covered for addition.
+ other = get_other_pair(hash_value, in_t1=False)
+ item_id = id(other.item)
+ index = t1_hashtable[hash_value].indexes[0]
+ index2 = t1_hashtable[hash_value].indexes[0] if other.item is notpresent else other.indexes[0]
+ change_level = level.branch_deeper(
+ t1_hashtable[hash_value].item,
+ other.item,
+ child_relationship_class=SubscriptableIterableRelationship,
+ child_relationship_param=index,
+ child_relationship_param2=index2,
+ )
+ if other.item is notpresent:
+ self._report_result('iterable_item_removed', change_level, local_tree=local_tree)
+ else:
+ # Just like the case when report_repetition = True, these lines never run currently.
+ # However they will stay here in case things change in future.
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id) # pragma: no cover.
+ self._diff(change_level, parents_ids_added, local_tree=local_tree) # pragma: no cover.
+
+ def _diff_booleans(self, level, local_tree=None):
+ if level.t1 != level.t2:
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_numbers(self, level, local_tree=None, report_type_change=True):
+ """Diff Numbers"""
+ if report_type_change:
+ t1_type = "number" if self.ignore_numeric_type_changes else level.t1.__class__.__name__
+ t2_type = "number" if self.ignore_numeric_type_changes else level.t2.__class__.__name__
+ else:
+ t1_type = t2_type = ''
+
+ if self.use_log_scale:
+ if not logarithmic_similarity(level.t1, level.t2, threshold=self.log_scale_similarity_threshold):
+ self._report_result('values_changed', level, local_tree=local_tree)
+ elif self.math_epsilon is not None:
+ if not is_close(level.t1, level.t2, abs_tol=self.math_epsilon):
+ self._report_result('values_changed', level, local_tree=local_tree)
+ elif self.significant_digits is None:
+ if level.t1 != level.t2:
+ self._report_result('values_changed', level, local_tree=local_tree)
+ else:
+ # Bernhard10: I use string formatting for comparison, to be consistent with usecases where
+ # data is read from files that were previously written from python and
+ # to be consistent with on-screen representation of numbers.
+ # Other options would be abs(t1-t2)<10**-self.significant_digits
+ # or math.is_close (python3.5+)
+ # Note that abs(3.25-3.251) = 0.0009999999999998899 < 0.001
+ # Note also that "{:.3f}".format(1.1135) = 1.113, but "{:.3f}".format(1.11351) = 1.114
+ # For Decimals, format seems to round 2.5 to 2 and 3.5 to 4 (to closest even number)
+ t1_s = self.number_to_string(level.t1,
+ significant_digits=self.significant_digits,
+ number_format_notation=self.number_format_notation)
+ t2_s = self.number_to_string(level.t2,
+ significant_digits=self.significant_digits,
+ number_format_notation=self.number_format_notation)
+
+ t1_s = KEY_TO_VAL_STR.format(t1_type, t1_s)
+ t2_s = KEY_TO_VAL_STR.format(t2_type, t2_s)
+ if t1_s != t2_s:
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_ipranges(self, level, local_tree=None):
+ """Diff IP ranges"""
+ if str(level.t1) != str(level.t2):
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_datetime(self, level, local_tree=None):
+ """Diff DateTimes"""
+ level.t1 = datetime_normalize(self.truncate_datetime, level.t1, default_timezone=self.default_timezone)
+ level.t2 = datetime_normalize(self.truncate_datetime, level.t2, default_timezone=self.default_timezone)
+
+ if level.t1 != level.t2:
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_time(self, level, local_tree=None):
+ """Diff DateTimes"""
+ if self.truncate_datetime:
+ level.t1 = datetime_normalize(self.truncate_datetime, level.t1, default_timezone=self.default_timezone)
+ level.t2 = datetime_normalize(self.truncate_datetime, level.t2, default_timezone=self.default_timezone)
+
+ if level.t1 != level.t2:
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_uuids(self, level, local_tree=None):
+ """Diff UUIDs"""
+ if level.t1.int != level.t2.int:
+ self._report_result('values_changed', level, local_tree=local_tree)
+
+ def _diff_numpy_array(self, level, parents_ids=frozenset(), local_tree=None):
+ """Diff numpy arrays"""
+ if level.path() not in self._numpy_paths:
+ self._numpy_paths[level.path()] = get_type(level.t2).__name__
+ if np is None:
+ # This line should never be run. If it is ever called means the type check detected a numpy array
+ # which means numpy module needs to be available. So np can't be None.
+ raise ImportError(CANT_FIND_NUMPY_MSG) # pragma: no cover
+
+ if (self.ignore_order_func and not self.ignore_order_func(level)) or not self.ignore_order:
+ # fast checks
+ if self.significant_digits is None:
+ if np.array_equal(level.t1, level.t2, equal_nan=self.ignore_nan_inequality):
+ return # all good
+ else:
+ try:
+ np.testing.assert_almost_equal(level.t1, level.t2, decimal=self.significant_digits)
+ except TypeError:
+ np.array_equal(level.t1, level.t2, equal_nan=self.ignore_nan_inequality)
+ except AssertionError:
+ pass # do detailed checking below
+ else:
+ return # all good
+
+ # compare array meta-data
+ _original_type = level.t1.dtype
+ if level.t1.shape != level.t2.shape:
+ # arrays are converted to python lists so that certain features of DeepDiff can apply on them easier.
+ # They will be converted back to Numpy at their final dimension.
+ level.t1 = level.t1.tolist()
+ level.t2 = level.t2.tolist()
+ self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+ else:
+ # metadata same -- the difference is in the content
+ shape = level.t1.shape
+ dimensions = len(shape)
+ if dimensions == 1:
+ self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+ elif (self.ignore_order_func and self.ignore_order_func(level)) or self.ignore_order:
+ # arrays are converted to python lists so that certain features of DeepDiff can apply on them easier.
+ # They will be converted back to Numpy at their final dimension.
+ level.t1 = level.t1.tolist()
+ level.t2 = level.t2.tolist()
+ self._diff_iterable_with_deephash(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+ else:
+ for (t1_path, t1_row), (t2_path, t2_row) in zip(
+ get_numpy_ndarray_rows(level.t1, shape),
+ get_numpy_ndarray_rows(level.t2, shape)):
+
+ new_level = level.branch_deeper(
+ t1_row,
+ t2_row,
+ child_relationship_class=NumpyArrayRelationship,
+ child_relationship_param=t1_path,
+ child_relationship_param2=t2_path,
+ )
+
+ self._diff_iterable_in_order(new_level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+
+ def _diff_types(self, level, local_tree=None):
+ """Diff types"""
+ level.report_type = 'type_changes'
+ self._report_result('type_changes', level, local_tree=local_tree)
+
+ def _count_diff(self):
+ if (self.max_diffs is not None and self._stats[DIFF_COUNT] > self.max_diffs):
+ if not self._stats[MAX_DIFF_LIMIT_REACHED]:
+ self._stats[MAX_DIFF_LIMIT_REACHED] = True
+ logger.warning(MAX_DIFFS_REACHED_MSG.format(self.max_diffs))
+ return StopIteration
+ self._stats[DIFF_COUNT] += 1
+ if self.cache_size and self.cache_tuning_sample_size:
+ self._auto_tune_cache()
+
+ def _auto_tune_cache(self):
+ take_sample = (self._stats[DIFF_COUNT] % self.cache_tuning_sample_size == 0)
+ if self.cache_tuning_sample_size:
+ if self._stats[DISTANCE_CACHE_ENABLED]:
+ if take_sample:
+ self._auto_off_cache()
+ # Turn on the cache once in a while
+ elif self._stats[DIFF_COUNT] % self._shared_parameters[_ENABLE_CACHE_EVERY_X_DIFF] == 0:
+ self.progress_logger('Re-enabling the distance and level caches.')
+ # decreasing the sampling frequency
+ self._shared_parameters[_ENABLE_CACHE_EVERY_X_DIFF] *= 10
+ self._stats[DISTANCE_CACHE_ENABLED] = True
+ if take_sample:
+ for key in (PREVIOUS_DIFF_COUNT, PREVIOUS_DISTANCE_CACHE_HIT_COUNT):
+ self._stats[key] = self._stats[key[9:]]
+
+ def _auto_off_cache(self):
+ """
+ Auto adjust the cache based on the usage
+ """
+ if self._stats[DISTANCE_CACHE_ENABLED]:
+ angle = (self._stats[DISTANCE_CACHE_HIT_COUNT] - self._stats['PREVIOUS {}'.format(DISTANCE_CACHE_HIT_COUNT)]) / (self._stats[DIFF_COUNT] - self._stats[PREVIOUS_DIFF_COUNT])
+ if angle < self.CACHE_AUTO_ADJUST_THRESHOLD:
+ self._stats[DISTANCE_CACHE_ENABLED] = False
+ self.progress_logger('Due to minimal cache hits, {} is disabled.'.format('distance cache'))
+
+ def _use_custom_operator(self, level):
+ """
+ For each level we check all custom operators.
+ If any one of them was a match for the level, we run the diff of the operator.
+ If the operator returned True, the operator must have decided these objects should not
+ be compared anymore. It might have already reported their results.
+ In that case the report will appear in the final results of this diff.
+ Otherwise basically the 2 objects in the level are being omitted from the results.
+ """
+
+ for operator in self.custom_operators:
+ if operator.match(level):
+ prevent_default = operator.give_up_diffing(level=level, diff_instance=self)
+ if prevent_default:
+ return True
+
+ return False
+
+ def _diff(self, level, parents_ids=frozenset(), _original_type=None, local_tree=None):
+ """
+ The main diff method
+
+ **parameters**
+
+ level: the tree level or tree node
+ parents_ids: the ids of all the parent objects in the tree from the current node.
+ _original_type: If the objects had an original type that was different than what currently exists in the level.t1 and t2
+ """
+ if self._count_diff() is StopIteration:
+ return
+
+ if self._use_custom_operator(level):
+ return
+
+ if level.t1 is level.t2:
+ return
+
+ if self._skip_this(level):
+ return
+
+ report_type_change = True
+ if get_type(level.t1) != get_type(level.t2):
+ for type_group in self.ignore_type_in_groups:
+ if self.type_check_func(level.t1, type_group) and self.type_check_func(level.t2, type_group):
+ report_type_change = False
+ break
+ if self.use_enum_value and isinstance(level.t1, Enum):
+ level.t1 = level.t1.value
+ report_type_change = False
+ if self.use_enum_value and isinstance(level.t2, Enum):
+ level.t2 = level.t2.value
+ report_type_change = False
+ if report_type_change:
+ self._diff_types(level, local_tree=local_tree)
+ return
+ # This is an edge case where t1=None or t2=None and None is in the ignore type group.
+ if level.t1 is None or level.t2 is None:
+ self._report_result('values_changed', level, local_tree=local_tree)
+ return
+
+ if self.ignore_nan_inequality and isinstance(level.t1, (float, np_floating)) and str(level.t1) == str(level.t2) == 'nan':
+ return
+
+ if isinstance(level.t1, booleans):
+ self._diff_booleans(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, strings):
+ self._diff_str(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, datetime.datetime):
+ self._diff_datetime(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, ipranges):
+ self._diff_ipranges(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, (datetime.date, datetime.timedelta, datetime.time)):
+ self._diff_time(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, uuids):
+ self._diff_uuids(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, numbers):
+ self._diff_numbers(level, local_tree=local_tree, report_type_change=report_type_change)
+
+ elif isinstance(level.t1, Mapping):
+ self._diff_dict(level, parents_ids, local_tree=local_tree)
+
+ elif isinstance(level.t1, tuple):
+ self._diff_tuple(level, parents_ids, local_tree=local_tree)
+
+ elif isinstance(level.t1, (set, frozenset, SetOrdered)):
+ self._diff_set(level, local_tree=local_tree)
+
+ elif isinstance(level.t1, np_ndarray):
+ self._diff_numpy_array(level, parents_ids, local_tree=local_tree)
+
+ elif isinstance(level.t1, PydanticBaseModel):
+ self._diff_obj(level, parents_ids, local_tree=local_tree, is_pydantic_object=True)
+
+ elif isinstance(level.t1, Iterable):
+ self._diff_iterable(level, parents_ids, _original_type=_original_type, local_tree=local_tree)
+
+ elif isinstance(level.t1, Enum):
+ self._diff_enum(level, parents_ids, local_tree=local_tree)
+
+ else:
+ self._diff_obj(level, parents_ids)
+
+ def _get_view_results(self, view):
+ """
+ Get the results based on the view
+ """
+ result = self.tree
+ if not self.report_repetition: # and self.is_root:
+ result.mutual_add_removes_to_become_value_changes()
+ if view == TREE_VIEW:
+ pass
+ elif view == TEXT_VIEW:
+ result = TextResult(tree_results=self.tree, verbose_level=self.verbose_level)
+ result.remove_empty_keys()
+ elif view == DELTA_VIEW:
+ result = self._to_delta_dict(report_repetition_required=False)
+ else:
+ raise ValueError(INVALID_VIEW_MSG.format(view))
+ return result
+
+ @staticmethod
+ def _get_key_for_group_by(row, group_by, item_name):
+ try:
+ return row.pop(group_by)
+ except KeyError:
+ logger.error("Unable to group {} by {}. The key is missing in {}".format(item_name, group_by, row))
+ raise
+
+ def _group_iterable_to_dict(self, item, group_by, item_name):
+ """
+ Convert a list of dictionaries into a dictionary of dictionaries
+ where the key is the value of the group_by key in each dictionary.
+ """
+ group_by_level2 = None
+ if isinstance(group_by, (list, tuple)):
+ group_by_level1 = group_by[0]
+ if len(group_by) > 1:
+ group_by_level2 = group_by[1]
+ else:
+ group_by_level1 = group_by
+ if isinstance(item, Iterable) and not isinstance(item, Mapping):
+ result = {}
+ item_copy = deepcopy(item)
+ for row in item_copy:
+ if isinstance(row, Mapping):
+ key1 = self._get_key_for_group_by(row, group_by_level1, item_name)
+ if group_by_level2:
+ key2 = self._get_key_for_group_by(row, group_by_level2, item_name)
+ if key1 not in result:
+ result[key1] = {}
+ if self.group_by_sort_key:
+ if key2 not in result[key1]:
+ result[key1][key2] = []
+ result_key1_key2 = result[key1][key2]
+ if row not in result_key1_key2:
+ result_key1_key2.append(row)
+ else:
+ result[key1][key2] = row
+ else:
+ if self.group_by_sort_key:
+ if key1 not in result:
+ result[key1] = []
+ if row not in result[key1]:
+ result[key1].append(row)
+ else:
+ result[key1] = row
+ else:
+ msg = "Unable to group {} by {} since the item {} is not a dictionary.".format(item_name, group_by_level1, row)
+ logger.error(msg)
+ raise ValueError(msg)
+ if self.group_by_sort_key:
+ if group_by_level2:
+ for key1, row1 in result.items():
+ for key2, row in row1.items():
+ row.sort(key=self.group_by_sort_key)
+ else:
+ for key, row in result.items():
+ row.sort(key=self.group_by_sort_key)
+ return result
+ msg = "Unable to group {} by {}".format(item_name, group_by)
+ logger.error(msg)
+ raise ValueError(msg)
+
+ def get_stats(self):
+ """
+ Get some stats on internals of the DeepDiff run.
+ """
+ return self._stats
+
+ @property
+ def affected_paths(self):
+ """
+ Get the list of paths that were affected.
+ Whether a value was changed or they were added or removed.
+
+ Example
+ >>> t1 = {1: 1, 2: 2, 3: [3], 4: 4}
+ >>> t2 = {1: 1, 2: 4, 3: [3, 4], 5: 5, 6: 6}
+ >>> ddiff = DeepDiff(t1, t2)
+ >>> ddiff
+ >>> pprint(ddiff, indent=4)
+ { 'dictionary_item_added': [root[5], root[6]],
+ 'dictionary_item_removed': [root[4]],
+ 'iterable_item_added': {'root[3][1]': 4},
+ 'values_changed': {'root[2]': {'new_value': 4, 'old_value': 2}}}
+ >>> ddiff.affected_paths
+ SetOrdered(['root[3][1]', 'root[4]', 'root[5]', 'root[6]', 'root[2]'])
+ >>> ddiff.affected_root_keys
+ SetOrdered([3, 4, 5, 6, 2])
+
+ """
+ result = SetOrdered()
+ for key in REPORT_KEYS:
+ value = self.get(key)
+ if value:
+ if isinstance(value, SetOrdered):
+ result |= value
+ else:
+ result |= SetOrdered(value.keys())
+ return result
+
+ @property
+ def affected_root_keys(self):
+ """
+ Get the list of root keys that were affected.
+ Whether a value was changed or they were added or removed.
+
+ Example
+ >>> t1 = {1: 1, 2: 2, 3: [3], 4: 4}
+ >>> t2 = {1: 1, 2: 4, 3: [3, 4], 5: 5, 6: 6}
+ >>> ddiff = DeepDiff(t1, t2)
+ >>> ddiff
+ >>> pprint(ddiff, indent=4)
+ { 'dictionary_item_added': [root[5], root[6]],
+ 'dictionary_item_removed': [root[4]],
+ 'iterable_item_added': {'root[3][1]': 4},
+ 'values_changed': {'root[2]': {'new_value': 4, 'old_value': 2}}}
+ >>> ddiff.affected_paths
+ SetOrdered(['root[3][1]', 'root[4]', 'root[5]', 'root[6]', 'root[2]'])
+ >>> ddiff.affected_root_keys
+ SetOrdered([3, 4, 5, 6, 2])
+ """
+ result = SetOrdered()
+ for key in REPORT_KEYS:
+ value = self.tree.get(key)
+ if value:
+ if isinstance(value, SetOrdered):
+ values_list = value
+ else:
+ values_list = value.keys()
+ for item in values_list:
+ root_key = item.get_root_key()
+ if root_key is not notpresent:
+ result.add(root_key)
+ return result
+
+
+if __name__ == "__main__": # pragma: no cover
+ import doctest
+ doctest.testmod()
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/distance.py b/.venv/lib/python3.12/site-packages/deepdiff/distance.py
new file mode 100644
index 00000000..adaf5045
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/distance.py
@@ -0,0 +1,342 @@
+import math
+import datetime
+from typing import TYPE_CHECKING, Callable, Protocol, Any
+from deepdiff.deephash import DeepHash
+from deepdiff.helper import (
+ DELTA_VIEW, numbers, strings, add_to_frozen_set, not_found, only_numbers, np, np_float64, time_to_seconds,
+ cartesian_product_numpy, np_ndarray, np_array_factory, get_homogeneous_numpy_compatible_type_of_seq, dict_,
+ CannotCompare)
+from collections.abc import Mapping, Iterable
+
+if TYPE_CHECKING:
+ from deepdiff.diff import DeepDiffProtocol
+
+ class DistanceProtocol(DeepDiffProtocol, Protocol):
+ hashes: dict
+ deephash_parameters: dict
+ iterable_compare_func: Callable | None
+ math_epsilon: float
+ cutoff_distance_for_pairs: float
+
+ def __get_item_rough_length(self, item, parent:str="root") -> float:
+ ...
+
+ def _to_delta_dict(
+ self,
+ directed: bool = True,
+ report_repetition_required: bool = True,
+ always_include_values: bool = False,
+ ) -> dict:
+ ...
+
+ def __calculate_item_deephash(self, item: Any) -> None:
+ ...
+
+
+
+DISTANCE_CALCS_NEEDS_CACHE = "Distance calculation can not happen once the cache is purged. Try with _cache='keep'"
+
+
+class DistanceMixin:
+
+ def _get_rough_distance(self: "DistanceProtocol"):
+ """
+ Gives a numeric value for the distance of t1 and t2 based on how many operations are needed to convert
+ one to the other.
+
+ This is a similar concept to the Levenshtein Edit Distance but for the structured data and is it is designed
+ to be between 0 and 1.
+
+ A distance of zero means the objects are equal and a distance of 1 is very far.
+
+ Note: The distance calculation formula is subject to change in future. Use the distance results only as a
+ way of comparing the distances of pairs of items with other pairs rather than an absolute distance
+ such as the one provided by Levenshtein edit distance.
+
+ Info: The current algorithm is based on the number of operations that are needed to convert t1 to t2 divided
+ by the number of items that make up t1 and t2.
+ """
+
+ _distance = get_numeric_types_distance(
+ self.t1, self.t2, max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold)
+
+ if _distance is not not_found:
+ return _distance
+
+ item = self if self.view == DELTA_VIEW else self._to_delta_dict(report_repetition_required=False)
+ diff_length = _get_item_length(item)
+
+ if diff_length == 0:
+ return 0
+
+ t1_len = self.__get_item_rough_length(self.t1)
+ t2_len = self.__get_item_rough_length(self.t2)
+
+ return diff_length / (t1_len + t2_len)
+
+ def __get_item_rough_length(self: "DistanceProtocol", item, parent='root'):
+ """
+ Get the rough length of an item.
+ It is used as a part of calculating the rough distance between objects.
+
+ **parameters**
+
+ item: The item to calculate the rough length for
+ parent: It is only used for DeepHash reporting purposes. Not really useful here.
+ """
+ if not hasattr(self, 'hashes'):
+ raise RuntimeError(DISTANCE_CALCS_NEEDS_CACHE)
+ length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1)
+ if length is None:
+ self.__calculate_item_deephash(item)
+ length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1)
+ return length
+
+ def __calculate_item_deephash(self: "DistanceProtocol", item: Any) -> None:
+ DeepHash(
+ item,
+ hashes=self.hashes,
+ parent='root',
+ apply_hash=True,
+ **self.deephash_parameters,
+ )
+
+ def _precalculate_distance_by_custom_compare_func(
+ self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type):
+ pre_calced_distances = dict_()
+ for added_hash in hashes_added:
+ for removed_hash in hashes_removed:
+ try:
+ is_close_distance = self.iterable_compare_func(t2_hashtable[added_hash].item, t1_hashtable[removed_hash].item)
+ except CannotCompare:
+ pass
+ else:
+ if is_close_distance:
+ # an arbitrary small distance if math_epsilon is not defined
+ distance = self.math_epsilon or 0.000001
+ else:
+ distance = 1
+ pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distance
+
+ return pre_calced_distances
+
+ def _precalculate_numpy_arrays_distance(
+ self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type):
+
+ # We only want to deal with 1D arrays.
+ if isinstance(t2_hashtable[next(iter(hashes_added))].item, (np_ndarray, list)):
+ return
+
+ pre_calced_distances = dict_()
+ added = [t2_hashtable[k].item for k in hashes_added]
+ removed = [t1_hashtable[k].item for k in hashes_removed]
+
+ if _original_type is None:
+ added_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(added)
+ removed_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(removed)
+ if added_numpy_compatible_type and added_numpy_compatible_type == removed_numpy_compatible_type:
+ _original_type = added_numpy_compatible_type
+ if _original_type is None:
+ return
+
+ added = np_array_factory(added, dtype=_original_type)
+ removed = np_array_factory(removed, dtype=_original_type)
+
+ pairs = cartesian_product_numpy(added, removed)
+
+ pairs_transposed = pairs.T
+
+ distances = _get_numpy_array_distance(
+ pairs_transposed[0], pairs_transposed[1],
+ max_=self.cutoff_distance_for_pairs,
+ use_log_scale=self.use_log_scale,
+ log_scale_similarity_threshold=self.log_scale_similarity_threshold,
+ )
+
+ i = 0
+ for added_hash in hashes_added:
+ for removed_hash in hashes_removed:
+ pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distances[i]
+ i += 1
+ return pre_calced_distances
+
+
+def _get_item_length(item, parents_ids=frozenset([])):
+ """
+ Get the number of operations in a diff object.
+ It is designed mainly for the delta view output
+ but can be used with other dictionary types of view outputs too.
+ """
+ length = 0
+ if isinstance(item, Mapping):
+ for key, subitem in item.items():
+ # dedupe the repetition report so the number of times items have shown up does not affect the distance.
+ if key in {'iterable_items_added_at_indexes', 'iterable_items_removed_at_indexes'}:
+ new_subitem = dict_()
+ for path_, indexes_to_items in subitem.items():
+ used_value_ids = set()
+ new_indexes_to_items = dict_()
+ for k, v in indexes_to_items.items():
+ v_id = id(v)
+ if v_id not in used_value_ids:
+ used_value_ids.add(v_id)
+ new_indexes_to_items[k] = v
+ new_subitem[path_] = new_indexes_to_items
+ subitem = new_subitem
+
+ # internal keys such as _numpy_paths should not count towards the distance
+ if isinstance(key, strings) and (key.startswith('_') or key == 'deep_distance' or key == 'new_path'):
+ continue
+
+ item_id = id(subitem)
+ if parents_ids and item_id in parents_ids:
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ length += _get_item_length(subitem, parents_ids_added)
+ elif isinstance(item, numbers):
+ length = 1
+ elif isinstance(item, strings):
+ length = 1
+ elif isinstance(item, Iterable):
+ for subitem in item:
+ item_id = id(subitem)
+ if parents_ids and item_id in parents_ids:
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ length += _get_item_length(subitem, parents_ids_added)
+ elif isinstance(item, type): # it is a class
+ length = 1
+ else:
+ if hasattr(item, '__dict__'):
+ for subitem in item.__dict__:
+ item_id = id(subitem)
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ length += _get_item_length(subitem, parents_ids_added)
+ return length
+
+
+def _get_numbers_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1):
+ """
+ Get the distance of 2 numbers. The output is a number between 0 to the max.
+ The reason is the
+ When max is returned means the 2 numbers are really far, and 0 means they are equal.
+ """
+ if num1 == num2:
+ return 0
+ if use_log_scale:
+ distance = logarithmic_distance(num1, num2)
+ if distance < 0:
+ return 0
+ return distance
+ if not isinstance(num1, float):
+ num1 = float(num1)
+ if not isinstance(num2, float):
+ num2 = float(num2)
+ # Since we have a default cutoff of 0.3 distance when
+ # getting the pairs of items during the ingore_order=True
+ # calculations, we need to make the divisor of comparison very big
+ # so that any 2 numbers can be chosen as pairs.
+ divisor = (num1 + num2) / max_
+ if divisor == 0:
+ return max_
+ try:
+ return min(max_, abs((num1 - num2) / divisor))
+ except Exception: # pragma: no cover. I don't think this line will ever run but doesn't hurt to leave it.
+ return max_ # pragma: no cover
+
+
+def _numpy_div(a, b, replace_inf_with=1):
+ max_array = np.full(shape=a.shape, fill_value=replace_inf_with, dtype=np_float64)
+ result = np.divide(a, b, out=max_array, where=b != 0, dtype=np_float64)
+ # wherever 2 numbers are the same, make sure the distance is zero. This is mainly for 0 divided by zero.
+ result[a == b] = 0
+ return result
+
+# To deal with numbers close to zero
+MATH_LOG_OFFSET = 1e-10
+
+def numpy_apply_log_keep_sign(array, offset=MATH_LOG_OFFSET):
+ # Calculate the absolute value and add the offset
+ abs_plus_offset = np.abs(array) + offset
+
+ # Calculate the logarithm
+ log_values = np.log(abs_plus_offset)
+
+ # Apply the original signs to the log values
+ signed_log_values = np.copysign(log_values, array)
+
+ return signed_log_values
+
+
+def logarithmic_similarity(a: numbers, b: numbers, threshold: float=0.1) -> float:
+ """
+ A threshold of 0.1 translates to about 10.5% difference.
+ A threshold of 0.5 translates to about 65% difference.
+ A threshold of 0.05 translates to about 5.1% difference.
+ """
+ return logarithmic_distance(a, b) < threshold
+
+
+def logarithmic_distance(a: numbers, b: numbers) -> float:
+ # Apply logarithm to the absolute values and consider the sign
+ a = float(a)
+ b = float(b)
+ log_a = math.copysign(math.log(abs(a) + MATH_LOG_OFFSET), a)
+ log_b = math.copysign(math.log(abs(b) + MATH_LOG_OFFSET), b)
+
+ return abs(log_a - log_b)
+
+
+def _get_numpy_array_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1):
+ """
+ Get the distance of 2 numbers. The output is a number between 0 to the max.
+ The reason is the
+ When max is returned means the 2 numbers are really far, and 0 means they are equal.
+ """
+ # Since we have a default cutoff of 0.3 distance when
+ # getting the pairs of items during the ingore_order=True
+ # calculations, we need to make the divisor of comparison very big
+ # so that any 2 numbers can be chosen as pairs.
+ if use_log_scale:
+ num1 = numpy_apply_log_keep_sign(num1)
+ num2 = numpy_apply_log_keep_sign(num2)
+
+ divisor = (num1 + num2) / max_
+ result = _numpy_div((num1 - num2), divisor, replace_inf_with=max_)
+
+ distance_array = np.clip(np.absolute(result), 0, max_)
+ if use_log_scale:
+ distance_array[distance_array < log_scale_similarity_threshold] = 0
+ return distance_array
+
+
+def _get_datetime_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold):
+ return _get_numbers_distance(date1.timestamp(), date2.timestamp(), max_)
+
+
+def _get_date_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold):
+ return _get_numbers_distance(date1.toordinal(), date2.toordinal(), max_)
+
+
+def _get_timedelta_distance(timedelta1, timedelta2, max_, use_log_scale, log_scale_similarity_threshold):
+ return _get_numbers_distance(timedelta1.total_seconds(), timedelta2.total_seconds(), max_)
+
+
+def _get_time_distance(time1, time2, max_, use_log_scale, log_scale_similarity_threshold):
+ return _get_numbers_distance(time_to_seconds(time1), time_to_seconds(time2), max_)
+
+
+TYPES_TO_DIST_FUNC = [
+ (only_numbers, _get_numbers_distance),
+ (datetime.datetime, _get_datetime_distance),
+ (datetime.date, _get_date_distance),
+ (datetime.timedelta, _get_timedelta_distance),
+ (datetime.time, _get_time_distance),
+]
+
+
+def get_numeric_types_distance(num1, num2, max_, use_log_scale=False, log_scale_similarity_threshold=0.1):
+ for type_, func in TYPES_TO_DIST_FUNC:
+ if isinstance(num1, type_) and isinstance(num2, type_):
+ return func(num1, num2, max_, use_log_scale, log_scale_similarity_threshold)
+ return not_found
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/helper.py b/.venv/lib/python3.12/site-packages/deepdiff/helper.py
new file mode 100644
index 00000000..63a4e315
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/helper.py
@@ -0,0 +1,837 @@
+import sys
+import re
+import os
+import datetime
+import uuid
+import logging
+import warnings
+import string
+import time
+import enum
+import ipaddress
+from typing import NamedTuple, Any, List, Optional, Dict, Union, TYPE_CHECKING, Tuple
+from ast import literal_eval
+from decimal import Decimal, localcontext, InvalidOperation as InvalidDecimalOperation
+from itertools import repeat
+from orderly_set import StableSetEq as SetOrderedBase # median: 1.0867 s for cache test, 5.63s for all tests
+from threading import Timer
+
+if TYPE_CHECKING:
+ from pytz.tzinfo import BaseTzInfo
+
+
+class np_type:
+ pass
+
+
+class pydantic_base_model_type:
+ pass
+
+
+class SetOrdered(SetOrderedBase):
+ def __repr__(self):
+ return str(list(self))
+
+
+try:
+ import numpy as np
+except ImportError: # pragma: no cover. The case without Numpy is tested locally only.
+ np = None # pragma: no cover.
+ np_array_factory = 'numpy not available' # pragma: no cover.
+ np_ndarray = np_type # pragma: no cover.
+ np_bool_ = np_type # pragma: no cover.
+ np_int8 = np_type # pragma: no cover.
+ np_int16 = np_type # pragma: no cover.
+ np_int32 = np_type # pragma: no cover.
+ np_int64 = np_type # pragma: no cover.
+ np_uint8 = np_type # pragma: no cover.
+ np_uint16 = np_type # pragma: no cover.
+ np_uint32 = np_type # pragma: no cover.
+ np_uint64 = np_type # pragma: no cover.
+ np_intp = np_type # pragma: no cover.
+ np_uintp = np_type # pragma: no cover.
+ np_float32 = np_type # pragma: no cover.
+ np_float64 = np_type # pragma: no cover.
+ np_double = np_type # pragma: no cover.
+ np_floating = np_type # pragma: no cover.
+ np_complex64 = np_type # pragma: no cover.
+ np_complex128 = np_type # pragma: no cover.
+ np_cdouble = np_type # pragma: no cover.
+ np_complexfloating = np_type # pragma: no cover.
+else:
+ np_array_factory = np.array
+ np_ndarray = np.ndarray
+ np_bool_ = np.bool_
+ np_int8 = np.int8
+ np_int16 = np.int16
+ np_int32 = np.int32
+ np_int64 = np.int64
+ np_uint8 = np.uint8
+ np_uint16 = np.uint16
+ np_uint32 = np.uint32
+ np_uint64 = np.uint64
+ np_intp = np.intp
+ np_uintp = np.uintp
+ np_float32 = np.float32
+ np_float64 = np.float64
+ np_double = np.double # np.float_ is an alias for np.double and is being removed by NumPy 2.0
+ np_floating = np.floating
+ np_complex64 = np.complex64
+ np_complex128 = np.complex128
+ np_cdouble = np.cdouble # np.complex_ is an alias for np.cdouble and is being removed by NumPy 2.0
+ np_complexfloating = np.complexfloating
+
+numpy_numbers = (
+ np_int8, np_int16, np_int32, np_int64, np_uint8,
+ np_uint16, np_uint32, np_uint64, np_intp, np_uintp,
+ np_float32, np_float64, np_double, np_floating, np_complex64,
+ np_complex128, np_cdouble,)
+
+numpy_complex_numbers = (
+ np_complexfloating, np_complex64, np_complex128, np_cdouble,
+)
+
+numpy_dtypes = set(numpy_numbers)
+numpy_dtypes.add(np_bool_) # type: ignore
+
+numpy_dtype_str_to_type = {
+ item.__name__: item for item in numpy_dtypes
+}
+
+try:
+ from pydantic.main import BaseModel as PydanticBaseModel # type: ignore
+except ImportError:
+ PydanticBaseModel = pydantic_base_model_type
+
+
+logger = logging.getLogger(__name__)
+
+py_major_version = sys.version_info.major
+py_minor_version = sys.version_info.minor
+
+py_current_version = Decimal("{}.{}".format(py_major_version, py_minor_version))
+
+py2 = py_major_version == 2
+py3 = py_major_version == 3
+py4 = py_major_version == 4
+
+
+NUMERICS = frozenset(string.digits)
+
+
+class EnumBase(str, enum.Enum):
+ def __repr__(self):
+ """
+ We need to add a single quotes so we can easily copy the value when we do ipdb.
+ """
+ return f"'{self.name}'"
+
+ def __str__(self):
+ return self.name
+
+
+def _int_or_zero(value):
+ """
+ Tries to extract some number from a string.
+
+ 12c becomes 12
+ """
+ try:
+ return int(value)
+ except Exception:
+ result = []
+ for char in value:
+ if char in NUMERICS:
+ result.append(char)
+ if result:
+ return int(''.join(result))
+ return 0
+
+
+def get_semvar_as_integer(version):
+ """
+ Converts:
+
+ '1.23.5' to 1023005
+ """
+ version = version.split('.')
+ if len(version) > 3:
+ version = version[:3]
+ elif len(version) < 3:
+ version.extend(['0'] * (3 - len(version)))
+
+ return sum([10**(i * 3) * _int_or_zero(v) for i, v in enumerate(reversed(version))])
+
+
+# we used to use OrderedDictPlus when dictionaries in Python were not ordered.
+dict_ = dict
+
+if py4:
+ logger.warning('Python 4 is not supported yet. Switching logic to Python 3.') # pragma: no cover
+ py3 = True # pragma: no cover
+
+if py2: # pragma: no cover
+ sys.exit('Python 2 is not supported anymore. The last version of DeepDiff that supported Py2 was 3.3.0')
+
+pypy3 = py3 and hasattr(sys, "pypy_translation_info")
+
+
+if np and get_semvar_as_integer(np.__version__) < 1019000:
+ sys.exit('The minimum required Numpy version is 1.19.0. Please upgrade your Numpy package.')
+
+strings = (str, bytes) # which are both basestring
+unicode_type = str
+bytes_type = bytes
+only_complex_number = (complex,) + numpy_complex_numbers
+only_numbers = (int, float, complex, Decimal) + numpy_numbers
+datetimes = (datetime.datetime, datetime.date, datetime.timedelta, datetime.time)
+ipranges = (ipaddress.IPv4Interface, ipaddress.IPv6Interface, ipaddress.IPv4Network, ipaddress.IPv6Network)
+uuids = (uuid.UUID, )
+times = (datetime.datetime, datetime.time)
+numbers: Tuple = only_numbers + datetimes
+booleans = (bool, np_bool_)
+
+basic_types = strings + numbers + uuids + booleans + (type(None), )
+
+class IndexedHash(NamedTuple):
+ indexes: List
+ item: Any
+
+current_dir = os.path.dirname(os.path.abspath(__file__))
+
+ID_PREFIX = '!>*id'
+
+KEY_TO_VAL_STR = "{}:{}"
+
+TREE_VIEW = 'tree'
+TEXT_VIEW = 'text'
+DELTA_VIEW = '_delta'
+
+ENUM_INCLUDE_KEYS = ['__objclass__', 'name', 'value']
+
+
+def short_repr(item, max_length=15):
+ """Short representation of item if it is too long"""
+ item = repr(item)
+ if len(item) > max_length:
+ item = '{}...{}'.format(item[:max_length - 3], item[-1])
+ return item
+
+
+class ListItemRemovedOrAdded: # pragma: no cover
+ """Class of conditions to be checked"""
+ pass
+
+
+class OtherTypes:
+ def __repr__(self):
+ return "Error: {}".format(self.__class__.__name__) # pragma: no cover
+
+ __str__ = __repr__
+
+
+class Skipped(OtherTypes):
+ pass
+
+
+class Unprocessed(OtherTypes):
+ pass
+
+
+class NotHashed(OtherTypes):
+ pass
+
+
+class NotPresent: # pragma: no cover
+ """
+ In a change tree, this indicated that a previously existing object has been removed -- or will only be added
+ in the future.
+ We previously used None for this but this caused problem when users actually added and removed None. Srsly guys? :D
+ """
+
+ def __repr__(self):
+ return 'not present' # pragma: no cover
+
+ __str__ = __repr__
+
+
+class CannotCompare(Exception):
+ """
+ Exception when two items cannot be compared in the compare function.
+ """
+ pass
+
+
+unprocessed = Unprocessed()
+skipped = Skipped()
+not_hashed = NotHashed()
+notpresent = NotPresent()
+
+# Disabling remapping from old to new keys since the mapping is deprecated.
+RemapDict = dict_
+
+
+# class RemapDict(dict_):
+# """
+# DISABLED
+# Remap Dictionary.
+
+# For keys that have a new, longer name, remap the old key to the new key.
+# Other keys that don't have a new name are handled as before.
+# """
+
+# def __getitem__(self, old_key):
+# new_key = EXPANDED_KEY_MAP.get(old_key, old_key)
+# if new_key != old_key:
+# logger.warning(
+# "DeepDiff Deprecation: %s is renamed to %s. Please start using "
+# "the new unified naming convention.", old_key, new_key)
+# if new_key in self:
+# return self.get(new_key)
+# else: # pragma: no cover
+# raise KeyError(new_key)
+
+
+class indexed_set(set):
+ """
+ A set class that lets you get an item by index
+
+ >>> a = indexed_set()
+ >>> a.add(10)
+ >>> a.add(20)
+ >>> a[0]
+ 10
+ """
+
+
+def add_to_frozen_set(parents_ids, item_id):
+ return parents_ids | {item_id}
+
+
+def convert_item_or_items_into_set_else_none(items):
+ if items:
+ if isinstance(items, strings):
+ items = {items}
+ else:
+ items = set(items)
+ else:
+ items = None
+ return items
+
+
+def add_root_to_paths(paths):
+ """
+ Sometimes the users want to just pass
+ [key] instead of root[key] for example.
+ Here we automatically add all sorts of variations that might match
+ the path they were supposed to pass.
+ """
+ if paths is None:
+ return
+ result = SetOrdered()
+ for path in paths:
+ if path.startswith('root'):
+ result.add(path)
+ else:
+ if path.isdigit():
+ result.add(f"root['{path}']")
+ result.add(f"root[{path}]")
+ elif path[0].isdigit():
+ result.add(f"root['{path}']")
+ else:
+ result.add(f"root.{path}")
+ result.add(f"root['{path}']")
+ return result
+
+
+RE_COMPILED_TYPE = type(re.compile(''))
+
+
+def convert_item_or_items_into_compiled_regexes_else_none(items):
+ if items:
+ if isinstance(items, (strings, RE_COMPILED_TYPE)):
+ items = [items]
+ items = [i if isinstance(i, RE_COMPILED_TYPE) else re.compile(i) for i in items]
+ else:
+ items = None
+ return items
+
+
+def get_id(obj):
+ """
+ Adding some characters to id so they are not just integers to reduce the risk of collision.
+ """
+ return "{}{}".format(ID_PREFIX, id(obj))
+
+
+def get_type(obj):
+ """
+ Get the type of object or if it is a class, return the class itself.
+ """
+ if isinstance(obj, np_ndarray):
+ return obj.dtype.type # type: ignore
+ return obj if type(obj) is type else type(obj)
+
+
+def numpy_dtype_string_to_type(dtype_str):
+ return numpy_dtype_str_to_type[dtype_str]
+
+
+def type_in_type_group(item, type_group):
+ return get_type(item) in type_group
+
+
+def type_is_subclass_of_type_group(item, type_group):
+ return isinstance(item, type_group) \
+ or (isinstance(item, type) and issubclass(item, type_group)) \
+ or type_in_type_group(item, type_group)
+
+
+def get_doc(doc_filename):
+ try:
+ with open(os.path.join(current_dir, '../docs/', doc_filename), 'r') as doc_file:
+ doc = doc_file.read()
+ except Exception: # pragma: no cover
+ doc = 'Failed to load the docstrings. Please visit: https://zepworks.com/deepdiff/current/' # pragma: no cover
+ return doc
+
+
+number_formatting = {
+ "f": r'{:.%sf}',
+ "e": r'{:.%se}',
+}
+
+
+def number_to_string(number, significant_digits, number_format_notation="f"):
+ """
+ Convert numbers to string considering significant digits.
+ """
+ try:
+ using = number_formatting[number_format_notation]
+ except KeyError:
+ raise ValueError("number_format_notation got invalid value of {}. The valid values are 'f' and 'e'".format(number_format_notation)) from None
+
+ if not isinstance(number, numbers): # type: ignore
+ return number
+ elif isinstance(number, Decimal):
+ with localcontext() as ctx:
+ # Precision = number of integer digits + significant_digits
+ # Using number//1 to get the integer part of the number
+ ctx.prec = len(str(abs(number // 1))) + significant_digits
+ try:
+ number = number.quantize(Decimal('0.' + '0' * significant_digits))
+ except InvalidDecimalOperation:
+ # Sometimes rounding up causes a higher precision to be needed for the quantize operation
+ # For example '999.99999999' will become '1000.000000' after quantize
+ ctx.prec += 1
+ number = number.quantize(Decimal('0.' + '0' * significant_digits))
+ elif isinstance(number, only_complex_number): # type: ignore
+ # Case for complex numbers.
+ number = number.__class__(
+ "{real}+{imag}j".format( # type: ignore
+ real=number_to_string(
+ number=number.real, # type: ignore
+ significant_digits=significant_digits,
+ number_format_notation=number_format_notation
+ ),
+ imag=number_to_string(
+ number=number.imag, # type: ignore
+ significant_digits=significant_digits,
+ number_format_notation=number_format_notation
+ )
+ ) # type: ignore
+ )
+ else:
+ number = round(number=number, ndigits=significant_digits) # type: ignore
+
+ if significant_digits == 0:
+ number = int(number)
+
+ if number == 0.0:
+ # Special case for 0: "-0.xx" should compare equal to "0.xx"
+ number = abs(number) # type: ignore
+
+ # Cast number to string
+ result = (using % significant_digits).format(number)
+ # https://bugs.python.org/issue36622
+ if number_format_notation == 'e':
+ # Removing leading 0 for exponential part.
+ result = re.sub(
+ pattern=r'(?<=e(\+|\-))0(?=\d)+',
+ repl=r'',
+ string=result
+ )
+ return result
+
+
+class DeepDiffDeprecationWarning(DeprecationWarning):
+ """
+ Use this warning instead of DeprecationWarning
+ """
+ pass
+
+
+def cartesian_product(a, b):
+ """
+ Get the Cartesian product of two iterables
+
+ **parameters**
+
+ a: list of lists
+ b: iterable to do the Cartesian product
+ """
+
+ for i in a:
+ for j in b:
+ yield i + (j,)
+
+
+def cartesian_product_of_shape(dimentions, result=None):
+ """
+ Cartesian product of a dimentions iterable.
+ This is mainly used to traverse Numpy ndarrays.
+
+ Each array has dimentions that are defines in ndarray.shape
+ """
+ if result is None:
+ result = ((),) # a tuple with an empty tuple
+ for dimension in dimentions:
+ result = cartesian_product(result, range(dimension))
+ return result
+
+
+def get_numpy_ndarray_rows(obj, shape=None):
+ """
+ Convert a multi dimensional numpy array to list of rows
+ """
+ if shape is None:
+ shape = obj.shape
+
+ dimentions = shape[:-1]
+ for path_tuple in cartesian_product_of_shape(dimentions):
+ result = obj
+ for index in path_tuple:
+ result = result[index]
+ yield path_tuple, result
+
+
+class _NotFound:
+
+ def __eq__(self, other):
+ return False
+
+ __req__ = __eq__
+
+ def __repr__(self):
+ return 'not found'
+
+ __str__ = __repr__
+
+
+not_found = _NotFound()
+
+warnings.simplefilter('once', DeepDiffDeprecationWarning)
+
+
+class RepeatedTimer:
+ """
+ Threaded Repeated Timer by MestreLion
+ https://stackoverflow.com/a/38317060/1497443
+ """
+
+ def __init__(self, interval, function, *args, **kwargs):
+ self._timer = None
+ self.interval = interval
+ self.function = function
+ self.args = args
+ self.start_time = time.time()
+ self.kwargs = kwargs
+ self.is_running = False
+ self.start()
+
+ def _get_duration_sec(self):
+ return int(time.time() - self.start_time)
+
+ def _run(self):
+ self.is_running = False
+ self.start()
+ self.function(*self.args, **self.kwargs)
+
+ def start(self):
+ self.kwargs.update(duration=self._get_duration_sec())
+ if not self.is_running:
+ self._timer = Timer(self.interval, self._run)
+ self._timer.start()
+ self.is_running = True
+
+ def stop(self):
+ duration = self._get_duration_sec()
+ if self._timer is not None:
+ self._timer.cancel()
+ self.is_running = False
+ return duration
+
+
+def _eval_decimal(params):
+ return Decimal(params)
+
+
+def _eval_datetime(params):
+ params = f'({params})'
+ params = literal_eval(params)
+ return datetime.datetime(*params)
+
+
+def _eval_date(params):
+ params = f'({params})'
+ params = literal_eval(params)
+ return datetime.date(*params)
+
+
+LITERAL_EVAL_PRE_PROCESS = [
+ ('Decimal(', ')', _eval_decimal),
+ ('datetime.datetime(', ')', _eval_datetime),
+ ('datetime.date(', ')', _eval_date),
+]
+
+
+def literal_eval_extended(item):
+ """
+ An extended version of literal_eval
+ """
+ try:
+ return literal_eval(item)
+ except (SyntaxError, ValueError):
+ for begin, end, func in LITERAL_EVAL_PRE_PROCESS:
+ if item.startswith(begin) and item.endswith(end):
+ # Extracting and removing extra quotes so for example "Decimal('10.1')" becomes "'10.1'" and then '10.1'
+ params = item[len(begin): -len(end)].strip('\'\"')
+ return func(params)
+ raise
+
+
+def time_to_seconds(t:datetime.time) -> int:
+ return (t.hour * 60 + t.minute) * 60 + t.second
+
+
+def datetime_normalize(
+ truncate_datetime:Union[str, None],
+ obj:Union[datetime.datetime, datetime.time],
+ default_timezone: Union[
+ datetime.timezone, "BaseTzInfo"
+ ] = datetime.timezone.utc,
+) -> Any:
+ if truncate_datetime:
+ if truncate_datetime == 'second':
+ obj = obj.replace(microsecond=0)
+ elif truncate_datetime == 'minute':
+ obj = obj.replace(second=0, microsecond=0)
+ elif truncate_datetime == 'hour':
+ obj = obj.replace(minute=0, second=0, microsecond=0)
+ elif truncate_datetime == 'day':
+ obj = obj.replace(hour=0, minute=0, second=0, microsecond=0)
+ if isinstance(obj, datetime.datetime):
+ if has_timezone(obj):
+ obj = obj.astimezone(default_timezone)
+ else:
+ obj = obj.replace(tzinfo=default_timezone)
+ elif isinstance(obj, datetime.time):
+ return time_to_seconds(obj)
+ return obj
+
+
+def has_timezone(dt):
+ """
+ Function to check if a datetime object has a timezone
+
+ Checking dt.tzinfo.utcoffset(dt) ensures that the datetime object is truly timezone-aware
+ because some datetime objects may have a tzinfo attribute that is not None but still
+ doesn't provide a valid offset.
+
+ Certain tzinfo objects, such as pytz.timezone(None), can exist but do not provide meaningful UTC offset information.
+ If tzinfo is present but calling .utcoffset(dt) returns None, the datetime is not truly timezone-aware.
+ """
+ return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
+
+
+def get_truncate_datetime(truncate_datetime) -> Union[str, None]:
+ """
+ Validates truncate_datetime value
+ """
+ if truncate_datetime not in {None, 'second', 'minute', 'hour', 'day'}:
+ raise ValueError("truncate_datetime must be second, minute, hour or day")
+ return truncate_datetime
+
+
+def cartesian_product_numpy(*arrays):
+ """
+ Cartesian product of Numpy arrays by Paul Panzer
+ https://stackoverflow.com/a/49445693/1497443
+ """
+ la = len(arrays)
+ dtype = np.result_type(*arrays) # type: ignore
+ arr = np.empty((la, *map(len, arrays)), dtype=dtype) # type: ignore
+ idx = slice(None), *repeat(None, la)
+ for i, a in enumerate(arrays):
+ arr[i, ...] = a[idx[:la - i]]
+ return arr.reshape(la, -1).T
+
+
+def diff_numpy_array(A, B):
+ """
+ Numpy Array A - B
+ return items in A that are not in B
+ By Divakar
+ https://stackoverflow.com/a/52417967/1497443
+ """
+ return A[~np.isin(A, B)] # type: ignore
+
+
+PYTHON_TYPE_TO_NUMPY_TYPE = {
+ int: np_int64,
+ float: np_float64,
+ Decimal: np_float64
+}
+
+
+def get_homogeneous_numpy_compatible_type_of_seq(seq):
+ """
+ Return with the numpy dtype if the array can be converted to a non-object numpy array.
+ Originally written by mgilson https://stackoverflow.com/a/13252348/1497443
+ This is the modified version.
+ """
+ iseq = iter(seq)
+ first_type = type(next(iseq))
+ if first_type in {int, float, Decimal}:
+ type_ = first_type if all((type(x) is first_type) for x in iseq) else False
+ return PYTHON_TYPE_TO_NUMPY_TYPE.get(type_, False)
+ else:
+ return False
+
+
+def detailed__dict__(obj, ignore_private_variables=True, ignore_keys=frozenset(), include_keys=None):
+ """
+ Get the detailed dictionary of an object.
+
+ This is used so we retrieve object properties too.
+ """
+ if include_keys:
+ result = {}
+ for key in include_keys:
+ try:
+ value = getattr(obj, key)
+ except Exception:
+ pass
+ else:
+ if not callable(value) or key == '__objclass__': # We don't want to compare functions, however for backward compatibility, __objclass__ needs to be reported.
+ result[key] = value
+ else:
+ result = obj.__dict__.copy() # A shallow copy
+ private_var_prefix = f"_{obj.__class__.__name__}__" # The semi private variables in Python get this prefix
+ for key in ignore_keys:
+ if key in result or (
+ ignore_private_variables and key.startswith('__') and not key.startswith(private_var_prefix)
+ ):
+ del result[key]
+ for key in dir(obj):
+ if key not in result and key not in ignore_keys and (
+ not ignore_private_variables or (
+ ignore_private_variables and not key.startswith('__') and not key.startswith(private_var_prefix)
+ )
+ ):
+ value = getattr(obj, key)
+ if not callable(value):
+ result[key] = value
+ return result
+
+
+def named_tuple_repr(self):
+ fields = []
+ for field, value in self._asdict().items():
+ # Only include fields that do not have their default value
+ if field in self._field_defaults:
+ if value != self._field_defaults[field]:
+ fields.append(f"{field}={value!r}")
+ else:
+ fields.append(f"{field}={value!r}")
+
+ return f"{self.__class__.__name__}({', '.join(fields)})"
+
+
+class OpcodeTag(EnumBase):
+ insert = 'insert'
+ delete = 'delete'
+ equal = 'equal'
+ replace = 'replace' # type: ignore
+ # swapped = 'swapped' # in the future we should support reporting of items swapped with each other
+
+
+class Opcode(NamedTuple):
+ tag: str
+ t1_from_index: int
+ t1_to_index: int
+ t2_from_index: int
+ t2_to_index: int
+ old_values: Optional[List[Any]] = None
+ new_values: Optional[List[Any]] = None
+
+ __repr__ = __str__ = named_tuple_repr
+
+
+class FlatDataAction(EnumBase):
+ values_changed = 'values_changed'
+ type_changes = 'type_changes'
+ set_item_added = 'set_item_added'
+ set_item_removed = 'set_item_removed'
+ dictionary_item_added = 'dictionary_item_added'
+ dictionary_item_removed = 'dictionary_item_removed'
+ iterable_item_added = 'iterable_item_added'
+ iterable_item_removed = 'iterable_item_removed'
+ iterable_item_moved = 'iterable_item_moved'
+ iterable_items_inserted = 'iterable_items_inserted' # opcode
+ iterable_items_deleted = 'iterable_items_deleted' # opcode
+ iterable_items_replaced = 'iterable_items_replaced' # opcode
+ iterable_items_equal = 'iterable_items_equal' # opcode
+ attribute_removed = 'attribute_removed'
+ attribute_added = 'attribute_added'
+ unordered_iterable_item_added = 'unordered_iterable_item_added'
+ unordered_iterable_item_removed = 'unordered_iterable_item_removed'
+ initiated = "initiated"
+
+
+OPCODE_TAG_TO_FLAT_DATA_ACTION = {
+ OpcodeTag.insert: FlatDataAction.iterable_items_inserted,
+ OpcodeTag.delete: FlatDataAction.iterable_items_deleted,
+ OpcodeTag.replace: FlatDataAction.iterable_items_replaced,
+ OpcodeTag.equal: FlatDataAction.iterable_items_equal,
+}
+
+FLAT_DATA_ACTION_TO_OPCODE_TAG = {v: i for i, v in OPCODE_TAG_TO_FLAT_DATA_ACTION.items()}
+
+
+UnkownValueCode: str = 'unknown___'
+
+
+class FlatDeltaRow(NamedTuple):
+ path: List
+ action: FlatDataAction
+ value: Optional[Any] = UnkownValueCode
+ old_value: Optional[Any] = UnkownValueCode
+ type: Optional[Any] = UnkownValueCode
+ old_type: Optional[Any] = UnkownValueCode
+ new_path: Optional[List] = None
+ t1_from_index: Optional[int] = None
+ t1_to_index: Optional[int] = None
+ t2_from_index: Optional[int] = None
+ t2_to_index: Optional[int] = None
+
+ __repr__ = __str__ = named_tuple_repr
+
+
+JSON = Union[Dict[str, str], List[str], List[int], Dict[str, "JSON"], List["JSON"], str, int, float, bool, None]
+
+
+class SummaryNodeType(EnumBase):
+ dict = 'dict'
+ list = 'list'
+ leaf = 'leaf'
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py
new file mode 100644
index 00000000..75d1708e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py
@@ -0,0 +1,217 @@
+"""
+LFU cache Written by Shane Wang
+https://medium.com/@epicshane/a-python-implementation-of-lfu-least-frequently-used-cache-with-o-1-time-complexity-e16b34a3c49b
+https://github.com/luxigner/lfu_cache
+Modified by Sep Dehpour
+"""
+from collections import defaultdict
+from threading import Lock
+from statistics import mean
+from deepdiff.helper import not_found, dict_, SetOrdered
+
+
+class CacheNode:
+ def __init__(self, key, report_type, value, freq_node, pre, nxt):
+ self.key = key
+ if report_type:
+ self.content = defaultdict(SetOrdered)
+ self.content[report_type].add(value)
+ else:
+ self.content = value
+ self.freq_node = freq_node
+ self.pre = pre # previous CacheNode
+ self.nxt = nxt # next CacheNode
+
+ def free_myself(self):
+ if self.freq_node.cache_head == self.freq_node.cache_tail: # type: ignore
+ self.freq_node.cache_head = self.freq_node.cache_tail = None # type: ignore
+ elif self.freq_node.cache_head == self: # type: ignore
+ self.nxt.pre = None # type: ignore
+ self.freq_node.cache_head = self.nxt # type: ignore
+ elif self.freq_node.cache_tail == self: # type: ignore
+ self.pre.nxt = None # type: ignore
+ self.freq_node.cache_tail = self.pre # type: ignore
+ else:
+ self.pre.nxt = self.nxt # type: ignore
+ self.nxt.pre = self.pre # type: ignore
+
+ self.pre = None
+ self.nxt = None
+ self.freq_node = None
+
+
+class FreqNode:
+ def __init__(self, freq, pre, nxt):
+ self.freq = freq
+ self.pre = pre # previous FreqNode
+ self.nxt = nxt # next FreqNode
+ self.cache_head = None # CacheNode head under this FreqNode
+ self.cache_tail = None # CacheNode tail under this FreqNode
+
+ def count_caches(self):
+ if self.cache_head is None and self.cache_tail is None:
+ return 0
+ elif self.cache_head == self.cache_tail:
+ return 1
+ else:
+ return '2+'
+
+ def remove(self):
+ if self.pre is not None:
+ self.pre.nxt = self.nxt
+ if self.nxt is not None:
+ self.nxt.pre = self.pre
+
+ pre = self.pre
+ nxt = self.nxt
+ self.pre = self.nxt = self.cache_head = self.cache_tail = None
+
+ return (pre, nxt)
+
+ def pop_head_cache(self):
+ if self.cache_head is None and self.cache_tail is None:
+ return None
+ elif self.cache_head == self.cache_tail:
+ cache_head = self.cache_head
+ self.cache_head = self.cache_tail = None
+ return cache_head
+ else:
+ cache_head = self.cache_head
+ self.cache_head.nxt.pre = None # type: ignore
+ self.cache_head = self.cache_head.nxt # type: ignore
+ return cache_head
+
+ def append_cache_to_tail(self, cache_node):
+ cache_node.freq_node = self
+
+ if self.cache_head is None and self.cache_tail is None:
+ self.cache_head = self.cache_tail = cache_node
+ else:
+ cache_node.pre = self.cache_tail
+ cache_node.nxt = None
+ self.cache_tail.nxt = cache_node # type: ignore
+ self.cache_tail = cache_node
+
+ def insert_after_me(self, freq_node):
+ freq_node.pre = self
+ freq_node.nxt = self.nxt
+
+ if self.nxt is not None:
+ self.nxt.pre = freq_node
+
+ self.nxt = freq_node
+
+ def insert_before_me(self, freq_node):
+ if self.pre is not None:
+ self.pre.nxt = freq_node
+
+ freq_node.pre = self.pre
+ freq_node.nxt = self
+ self.pre = freq_node
+
+
+class LFUCache:
+
+ def __init__(self, capacity):
+ self.cache = dict_() # {key: cache_node}
+ if capacity <= 0:
+ raise ValueError('Capacity of LFUCache needs to be positive.') # pragma: no cover.
+ self.capacity = capacity
+ self.freq_link_head = None
+ self.lock = Lock()
+
+ def get(self, key):
+ with self.lock:
+ if key in self.cache:
+ cache_node = self.cache[key]
+ freq_node = cache_node.freq_node
+ content = cache_node.content
+
+ self.move_forward(cache_node, freq_node)
+
+ return content
+ else:
+ return not_found
+
+ def set(self, key, report_type=None, value=None):
+ with self.lock:
+ if key in self.cache:
+ cache_node = self.cache[key]
+ if report_type:
+ cache_node.content[report_type].add(value)
+ else:
+ cache_node.content = value
+ else:
+ if len(self.cache) >= self.capacity:
+ self.dump_cache()
+
+ self.create_cache_node(key, report_type, value)
+
+ def __contains__(self, key):
+ return key in self.cache
+
+ def move_forward(self, cache_node, freq_node):
+ if freq_node.nxt is None or freq_node.nxt.freq != freq_node.freq + 1:
+ target_freq_node = FreqNode(freq_node.freq + 1, None, None)
+ target_empty = True
+ else:
+ target_freq_node = freq_node.nxt
+ target_empty = False
+
+ cache_node.free_myself()
+ target_freq_node.append_cache_to_tail(cache_node)
+
+ if target_empty:
+ freq_node.insert_after_me(target_freq_node)
+
+ if freq_node.count_caches() == 0:
+ if self.freq_link_head == freq_node:
+ self.freq_link_head = target_freq_node
+
+ freq_node.remove()
+
+ def dump_cache(self):
+ head_freq_node = self.freq_link_head
+ self.cache.pop(head_freq_node.cache_head.key) # type: ignore
+ head_freq_node.pop_head_cache() # type: ignore
+
+ if head_freq_node.count_caches() == 0: # type: ignore
+ self.freq_link_head = head_freq_node.nxt # type: ignore
+ head_freq_node.remove() # type: ignore
+
+ def create_cache_node(self, key, report_type, value):
+ cache_node = CacheNode(
+ key=key, report_type=report_type,
+ value=value, freq_node=None, pre=None, nxt=None)
+ self.cache[key] = cache_node
+
+ if self.freq_link_head is None or self.freq_link_head.freq != 0:
+ new_freq_node = FreqNode(0, None, None)
+ new_freq_node.append_cache_to_tail(cache_node)
+
+ if self.freq_link_head is not None:
+ self.freq_link_head.insert_before_me(new_freq_node)
+
+ self.freq_link_head = new_freq_node
+ else:
+ self.freq_link_head.append_cache_to_tail(cache_node)
+
+ def get_sorted_cache_keys(self):
+ result = [(i, freq.freq_node.freq) for i, freq in self.cache.items()]
+ result.sort(key=lambda x: -x[1])
+ return result
+
+ def get_average_frequency(self):
+ return mean(freq.freq_node.freq for freq in self.cache.values())
+
+
+class DummyLFU:
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ set = __init__
+ get = __init__
+
+ def __contains__(self, key):
+ return False
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/model.py b/.venv/lib/python3.12/site-packages/deepdiff/model.py
new file mode 100644
index 00000000..41dd7517
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/model.py
@@ -0,0 +1,974 @@
+import logging
+from collections.abc import Mapping
+from copy import copy
+from deepdiff.helper import (
+ RemapDict, strings, notpresent, get_type, numpy_numbers, np, literal_eval_extended,
+ dict_, SetOrdered)
+from deepdiff.path import stringify_element
+
+logger = logging.getLogger(__name__)
+
+FORCE_DEFAULT = 'fake'
+UP_DOWN = {'up': 'down', 'down': 'up'}
+
+REPORT_KEYS = {
+ "type_changes",
+ "dictionary_item_added",
+ "dictionary_item_removed",
+ "values_changed",
+ "unprocessed",
+ "iterable_item_added",
+ "iterable_item_removed",
+ "iterable_item_moved",
+ "attribute_added",
+ "attribute_removed",
+ "set_item_removed",
+ "set_item_added",
+ "repetition_change",
+}
+
+CUSTOM_FIELD = "__internal:custom:extra_info"
+
+
+class DoesNotExist(Exception):
+ pass
+
+
+class ResultDict(RemapDict):
+
+ def remove_empty_keys(self):
+ """
+ Remove empty keys from this object. Should always be called after the result is final.
+ :return:
+ """
+ empty_keys = [k for k, v in self.items() if not isinstance(v, (int)) and not v]
+
+ for k in empty_keys:
+ del self[k]
+
+
+class TreeResult(ResultDict):
+ def __init__(self):
+ for key in REPORT_KEYS:
+ self[key] = SetOrdered()
+
+ def mutual_add_removes_to_become_value_changes(self):
+ """
+ There might be the same paths reported in the results as removed and added.
+ In such cases they should be reported as value_changes.
+
+ Note that this function mutates the tree in ways that causes issues when report_repetition=True
+ and should be avoided in that case.
+
+ This function should only be run on the Tree Result.
+ """
+ iterable_item_added = self.get('iterable_item_added')
+ iterable_item_removed = self.get('iterable_item_removed')
+ if iterable_item_added is not None and iterable_item_removed is not None:
+ added_paths = {i.path(): i for i in iterable_item_added}
+ removed_paths = {i.path(): i for i in iterable_item_removed}
+ mutual_paths = set(added_paths) & set(removed_paths)
+
+ if mutual_paths and 'values_changed' not in self or self['values_changed'] is None:
+ self['values_changed'] = SetOrdered()
+ for path in mutual_paths:
+ level_before = removed_paths[path]
+ iterable_item_removed.remove(level_before)
+ level_after = added_paths[path]
+ iterable_item_added.remove(level_after)
+ level_before.t2 = level_after.t2
+ self['values_changed'].add(level_before) # type: ignore
+ level_before.report_type = 'values_changed'
+ if 'iterable_item_removed' in self and not iterable_item_removed:
+ del self['iterable_item_removed']
+ if 'iterable_item_added' in self and not iterable_item_added:
+ del self['iterable_item_added']
+
+ def __getitem__(self, item):
+ if item not in self:
+ self[item] = SetOrdered()
+ return self.get(item)
+
+ def __len__(self):
+ length = 0
+ for value in self.values():
+ if isinstance(value, SetOrdered):
+ length += len(value)
+ elif isinstance(value, int):
+ length += 1
+ return length
+
+
+class TextResult(ResultDict):
+ ADD_QUOTES_TO_STRINGS = True
+
+ def __init__(self, tree_results=None, verbose_level=1):
+ self.verbose_level = verbose_level
+ # TODO: centralize keys
+ self.update({
+ "type_changes": dict_(),
+ "dictionary_item_added": self.__set_or_dict(),
+ "dictionary_item_removed": self.__set_or_dict(),
+ "values_changed": dict_(),
+ "unprocessed": [],
+ "iterable_item_added": dict_(),
+ "iterable_item_removed": dict_(),
+ "iterable_item_moved": dict_(),
+ "attribute_added": self.__set_or_dict(),
+ "attribute_removed": self.__set_or_dict(),
+ "set_item_removed": SetOrdered(),
+ "set_item_added": SetOrdered(),
+ "repetition_change": dict_()
+ })
+
+ if tree_results:
+ self._from_tree_results(tree_results)
+
+ def __set_or_dict(self):
+ return {} if self.verbose_level >= 2 else SetOrdered()
+
+ def _from_tree_results(self, tree):
+ """
+ Populate this object by parsing an existing reference-style result dictionary.
+ :param tree: A TreeResult
+ :return:
+ """
+ self._from_tree_type_changes(tree)
+ self._from_tree_default(tree, 'dictionary_item_added')
+ self._from_tree_default(tree, 'dictionary_item_removed')
+ self._from_tree_value_changed(tree)
+ self._from_tree_unprocessed(tree)
+ self._from_tree_default(tree, 'iterable_item_added')
+ self._from_tree_default(tree, 'iterable_item_removed')
+ self._from_tree_iterable_item_moved(tree)
+ self._from_tree_default(tree, 'attribute_added')
+ self._from_tree_default(tree, 'attribute_removed')
+ self._from_tree_set_item_removed(tree)
+ self._from_tree_set_item_added(tree)
+ self._from_tree_repetition_change(tree)
+ self._from_tree_deep_distance(tree)
+ self._from_tree_custom_results(tree)
+
+ def _from_tree_default(self, tree, report_type, ignore_if_in_iterable_opcodes=False):
+ if report_type in tree:
+
+ for change in tree[report_type]: # report each change
+ # When we convert from diff to delta result, we care more about opcodes than iterable_item_added or removed
+ if (
+ ignore_if_in_iterable_opcodes
+ and report_type in {"iterable_item_added", "iterable_item_removed"}
+ and change.up.path(force=FORCE_DEFAULT) in self["_iterable_opcodes"]
+ ):
+ continue
+ # determine change direction (added or removed)
+ # Report t2 (the new one) whenever possible.
+ # In cases where t2 doesn't exist (i.e. stuff removed), report t1.
+ if change.t2 is not notpresent:
+ item = change.t2
+ else:
+ item = change.t1
+
+ # do the reporting
+ report = self[report_type]
+ if isinstance(report, SetOrdered):
+ report.add(change.path(force=FORCE_DEFAULT))
+ elif isinstance(report, dict):
+ report[change.path(force=FORCE_DEFAULT)] = item
+ elif isinstance(report, list): # pragma: no cover
+ # we don't actually have any of those right now, but just in case
+ report.append(change.path(force=FORCE_DEFAULT))
+ else: # pragma: no cover
+ # should never happen
+ raise TypeError("Cannot handle {} report container type.".
+ format(report))
+
+ def _from_tree_type_changes(self, tree):
+ if 'type_changes' in tree:
+ for change in tree['type_changes']:
+ path = change.path(force=FORCE_DEFAULT)
+ if type(change.t1) is type:
+ include_values = False
+ old_type = change.t1
+ new_type = change.t2
+ else:
+ include_values = True
+ old_type = get_type(change.t1)
+ new_type = get_type(change.t2)
+ remap_dict = RemapDict({
+ 'old_type': old_type,
+ 'new_type': new_type,
+ })
+ if self.verbose_level > 1:
+ new_path = change.path(use_t2=True, force=FORCE_DEFAULT)
+ if path != new_path:
+ remap_dict['new_path'] = new_path
+ self['type_changes'][path] = remap_dict
+ if self.verbose_level and include_values:
+ remap_dict.update(old_value=change.t1, new_value=change.t2)
+
+ def _from_tree_value_changed(self, tree):
+ if 'values_changed' in tree and self.verbose_level > 0:
+ for change in tree['values_changed']:
+ path = change.path(force=FORCE_DEFAULT)
+ the_changed = {'new_value': change.t2, 'old_value': change.t1}
+ if self.verbose_level > 1:
+ new_path = change.path(use_t2=True, force=FORCE_DEFAULT)
+ if path != new_path:
+ the_changed['new_path'] = new_path
+ self['values_changed'][path] = the_changed
+ if 'diff' in change.additional:
+ the_changed.update({'diff': change.additional['diff']})
+
+ def _from_tree_iterable_item_moved(self, tree):
+ if 'iterable_item_moved' in tree and self.verbose_level > 1:
+ for change in tree['iterable_item_moved']:
+ the_changed = {'new_path': change.path(use_t2=True), 'value': change.t2}
+ self['iterable_item_moved'][change.path(
+ force=FORCE_DEFAULT)] = the_changed
+
+ def _from_tree_unprocessed(self, tree):
+ if 'unprocessed' in tree:
+ for change in tree['unprocessed']:
+ self['unprocessed'].append("{}: {} and {}".format(change.path(
+ force=FORCE_DEFAULT), change.t1, change.t2))
+
+ def _from_tree_set_item_added_or_removed(self, tree, key):
+ if key in tree:
+ set_item_info = self[key]
+ is_dict = isinstance(set_item_info, Mapping)
+ for change in tree[key]:
+ path = change.up.path(
+ ) # we want't the set's path, the added item is not directly accessible
+ item = change.t2 if key == 'set_item_added' else change.t1
+ if self.ADD_QUOTES_TO_STRINGS and isinstance(item, strings):
+ item = "'%s'" % item
+ if is_dict:
+ if path not in set_item_info:
+ set_item_info[path] = set() # type: ignore
+ set_item_info[path].add(item)
+ else:
+ set_item_info.add("{}[{}]".format(path, str(item)))
+ # this syntax is rather peculiar, but it's DeepDiff 2.x compatible)
+
+ def _from_tree_set_item_added(self, tree):
+ self._from_tree_set_item_added_or_removed(tree, key='set_item_added')
+
+ def _from_tree_set_item_removed(self, tree):
+ self._from_tree_set_item_added_or_removed(tree, key='set_item_removed')
+
+ def _from_tree_repetition_change(self, tree):
+ if 'repetition_change' in tree:
+ for change in tree['repetition_change']:
+ path = change.path(force=FORCE_DEFAULT)
+ self['repetition_change'][path] = RemapDict(
+ change.additional['repetition']
+ )
+ self['repetition_change'][path]['value'] = change.t1
+
+ def _from_tree_deep_distance(self, tree):
+ if 'deep_distance' in tree:
+ self['deep_distance'] = tree['deep_distance']
+
+ def _from_tree_custom_results(self, tree):
+ for k, _level_list in tree.items():
+ if k not in REPORT_KEYS:
+ if not isinstance(_level_list, SetOrdered):
+ continue
+
+ # if len(_level_list) == 0:
+ # continue
+ #
+ # if not isinstance(_level_list[0], DiffLevel):
+ # continue
+
+ # _level_list is a list of DiffLevel
+ _custom_dict = {}
+ for _level in _level_list:
+ _custom_dict[_level.path(
+ force=FORCE_DEFAULT)] = _level.additional.get(CUSTOM_FIELD, {})
+ self[k] = _custom_dict
+
+
+class DeltaResult(TextResult):
+ ADD_QUOTES_TO_STRINGS = False
+
+ def __init__(self, tree_results=None, ignore_order=None, always_include_values=False, _iterable_opcodes=None):
+ self.ignore_order = ignore_order
+ self.always_include_values = always_include_values
+
+ self.update({
+ "type_changes": dict_(),
+ "dictionary_item_added": dict_(),
+ "dictionary_item_removed": dict_(),
+ "values_changed": dict_(),
+ "iterable_item_added": dict_(),
+ "iterable_item_removed": dict_(),
+ "iterable_item_moved": dict_(),
+ "attribute_added": dict_(),
+ "attribute_removed": dict_(),
+ "set_item_removed": dict_(),
+ "set_item_added": dict_(),
+ "iterable_items_added_at_indexes": dict_(),
+ "iterable_items_removed_at_indexes": dict_(),
+ "_iterable_opcodes": _iterable_opcodes or {},
+ })
+
+ if tree_results:
+ self._from_tree_results(tree_results)
+
+ def _from_tree_results(self, tree):
+ """
+ Populate this object by parsing an existing reference-style result dictionary.
+ :param tree: A TreeResult
+ :return:
+ """
+ self._from_tree_type_changes(tree)
+ self._from_tree_default(tree, 'dictionary_item_added')
+ self._from_tree_default(tree, 'dictionary_item_removed')
+ self._from_tree_value_changed(tree)
+ if self.ignore_order:
+ self._from_tree_iterable_item_added_or_removed(
+ tree, 'iterable_item_added', delta_report_key='iterable_items_added_at_indexes')
+ self._from_tree_iterable_item_added_or_removed(
+ tree, 'iterable_item_removed', delta_report_key='iterable_items_removed_at_indexes')
+ else:
+ self._from_tree_default(tree, 'iterable_item_added', ignore_if_in_iterable_opcodes=True)
+ self._from_tree_default(tree, 'iterable_item_removed', ignore_if_in_iterable_opcodes=True)
+ self._from_tree_iterable_item_moved(tree)
+ self._from_tree_default(tree, 'attribute_added')
+ self._from_tree_default(tree, 'attribute_removed')
+ self._from_tree_set_item_removed(tree)
+ self._from_tree_set_item_added(tree)
+ self._from_tree_repetition_change(tree)
+
+ def _from_tree_iterable_item_added_or_removed(self, tree, report_type, delta_report_key):
+ if report_type in tree:
+ for change in tree[report_type]: # report each change
+ # determine change direction (added or removed)
+ # Report t2 (the new one) whenever possible.
+ # In cases where t2 doesn't exist (i.e. stuff removed), report t1.
+ if change.t2 is not notpresent:
+ item = change.t2
+ else:
+ item = change.t1
+
+ # do the reporting
+ path, param, _ = change.path(force=FORCE_DEFAULT, get_parent_too=True)
+ try:
+ iterable_items_added_at_indexes = self[delta_report_key][path]
+ except KeyError:
+ iterable_items_added_at_indexes = self[delta_report_key][path] = dict_()
+ iterable_items_added_at_indexes[param] = item
+
+ def _from_tree_type_changes(self, tree):
+ if 'type_changes' in tree:
+ for change in tree['type_changes']:
+ include_values = None
+ if type(change.t1) is type:
+ include_values = False
+ old_type = change.t1
+ new_type = change.t2
+ else:
+ old_type = get_type(change.t1)
+ new_type = get_type(change.t2)
+ include_values = True
+ try:
+ if new_type in numpy_numbers:
+ new_t1 = change.t1.astype(new_type)
+ include_values = not np.array_equal(new_t1, change.t2)
+ else:
+ new_t1 = new_type(change.t1)
+ # If simply applying the type from one value converts it to the other value,
+ # there is no need to include the actual values in the delta.
+ include_values = new_t1 != change.t2
+ except Exception:
+ pass
+
+ path = change.path(force=FORCE_DEFAULT)
+ new_path = change.path(use_t2=True, force=FORCE_DEFAULT)
+ remap_dict = RemapDict({
+ 'old_type': old_type,
+ 'new_type': new_type,
+ })
+ if path != new_path:
+ remap_dict['new_path'] = new_path
+ self['type_changes'][path] = remap_dict
+ if include_values or self.always_include_values:
+ remap_dict.update(old_value=change.t1, new_value=change.t2)
+
+ def _from_tree_value_changed(self, tree):
+ if 'values_changed' in tree:
+ for change in tree['values_changed']:
+ path = change.path(force=FORCE_DEFAULT)
+ new_path = change.path(use_t2=True, force=FORCE_DEFAULT)
+ the_changed = {'new_value': change.t2, 'old_value': change.t1}
+ if path != new_path:
+ the_changed['new_path'] = new_path
+ self['values_changed'][path] = the_changed
+ # If we ever want to store the difflib results instead of the new_value
+ # these lines need to be uncommented and the Delta object needs to be able
+ # to use them.
+ # if 'diff' in change.additional:
+ # the_changed.update({'diff': change.additional['diff']})
+
+ def _from_tree_repetition_change(self, tree):
+ if 'repetition_change' in tree:
+ for change in tree['repetition_change']:
+ path, _, _ = change.path(get_parent_too=True)
+ repetition = RemapDict(change.additional['repetition'])
+ value = change.t1
+ try:
+ iterable_items_added_at_indexes = self['iterable_items_added_at_indexes'][path]
+ except KeyError:
+ iterable_items_added_at_indexes = self['iterable_items_added_at_indexes'][path] = dict_()
+ for index in repetition['new_indexes']:
+ iterable_items_added_at_indexes[index] = value
+
+ def _from_tree_iterable_item_moved(self, tree):
+ if 'iterable_item_moved' in tree:
+ for change in tree['iterable_item_moved']:
+ if (
+ change.up.path(force=FORCE_DEFAULT) not in self["_iterable_opcodes"]
+ ):
+ the_changed = {'new_path': change.path(use_t2=True), 'value': change.t2}
+ self['iterable_item_moved'][change.path(
+ force=FORCE_DEFAULT)] = the_changed
+
+
+class DiffLevel:
+ """
+ An object of this class represents a single object-tree-level in a reported change.
+ A double-linked list of these object describes a single change on all of its levels.
+ Looking at the tree of all changes, a list of those objects represents a single path through the tree
+ (which is just fancy for "a change").
+ This is the result object class for object reference style reports.
+
+ Example:
+
+ >>> t1 = {2: 2, 4: 44}
+ >>> t2 = {2: "b", 5: 55}
+ >>> ddiff = DeepDiff(t1, t2, view='tree')
+ >>> ddiff
+ {'dictionary_item_added': {<DiffLevel id:4560126096, t1:None, t2:55>},
+ 'dictionary_item_removed': {<DiffLevel id:4560126416, t1:44, t2:None>},
+ 'type_changes': {<DiffLevel id:4560126608, t1:2, t2:b>}}
+
+ Graph:
+
+ <DiffLevel id:123, original t1,t2> <DiffLevel id:200, original t1,t2>
+ ↑up ↑up
+ | |
+ | ChildRelationship | ChildRelationship
+ | |
+ ↓down ↓down
+ <DiffLevel id:13, t1:None, t2:55> <DiffLevel id:421, t1:44, t2:None>
+ .path() = 'root[5]' .path() = 'root[4]'
+
+ Note that the 2 top level DiffLevel objects are 2 different objects even though
+ they are essentially talking about the same diff operation.
+
+
+ A ChildRelationship object describing the relationship between t1 and it's child object,
+ where t1's child object equals down.t1.
+
+ Think about it like a graph:
+
+ +---------------------------------------------------------------+
+ | |
+ | parent difflevel parent |
+ | + ^ + |
+ +------|--------------------------|---------------------|-------+
+ | | | up |
+ | Child | | | ChildRelationship
+ | Relationship | | |
+ | down | | |
+ +------|----------------------|-------------------------|-------+
+ | v v v |
+ | child difflevel child |
+ | |
+ +---------------------------------------------------------------+
+
+
+ The child_rel example:
+
+ # dictionary_item_removed is a set so in order to get an item from it:
+ >>> (difflevel,) = ddiff['dictionary_item_removed'])
+ >>> difflevel.up.t1_child_rel
+ <DictRelationship id:456, parent:{2: 2, 4: 44}, child:44, param:4>
+
+ >>> (difflevel,) = ddiff['dictionary_item_added'])
+ >>> difflevel
+ <DiffLevel id:4560126096, t1:None, t2:55>
+
+ >>> difflevel.up
+ >>> <DiffLevel id:4560154512, t1:{2: 2, 4: 44}, t2:{2: 'b', 5: 55}>
+
+ >>> difflevel.up
+ <DiffLevel id:4560154512, t1:{2: 2, 4: 44}, t2:{2: 'b', 5: 55}>
+
+ # t1 didn't exist
+ >>> difflevel.up.t1_child_rel
+
+ # t2 is added
+ >>> difflevel.up.t2_child_rel
+ <DictRelationship id:4560154384, parent:{2: 'b', 5: 55}, child:55, param:5>
+
+ """
+
+ def __init__(self,
+ t1,
+ t2,
+ down=None,
+ up=None,
+ report_type=None,
+ child_rel1=None,
+ child_rel2=None,
+ additional=None,
+ verbose_level=1):
+ """
+ :param child_rel1: Either:
+ - An existing ChildRelationship object describing the "down" relationship for t1; or
+ - A ChildRelationship subclass. In this case, we will create the ChildRelationship objects
+ for both t1 and t2.
+ Alternatives for child_rel1 and child_rel2 must be used consistently.
+ :param child_rel2: Either:
+ - An existing ChildRelationship object describing the "down" relationship for t2; or
+ - The param argument for a ChildRelationship class we shall create.
+ Alternatives for child_rel1 and child_rel2 must be used consistently.
+ """
+
+ # The current-level object in the left hand tree
+ self.t1 = t1
+
+ # The current-level object in the right hand tree
+ self.t2 = t2
+
+ # Another DiffLevel object describing this change one level deeper down the object tree
+ self.down = down
+
+ # Another DiffLevel object describing this change one level further up the object tree
+ self.up = up
+
+ self.report_type = report_type
+
+ # If this object is this change's deepest level, this contains a string describing the type of change.
+ # Examples: "set_item_added", "values_changed"
+
+ # Note: don't use {} as additional's default value - this would turn out to be always the same dict object
+ self.additional = dict_() if additional is None else additional
+
+ # For some types of changes we store some additional information.
+ # This is a dict containing this information.
+ # Currently, this is used for:
+ # - values_changed: In case the changes data is a multi-line string,
+ # we include a textual diff as additional['diff'].
+ # - repetition_change: additional['repetition']:
+ # e.g. {'old_repeat': 2, 'new_repeat': 1, 'old_indexes': [0, 2], 'new_indexes': [2]}
+ # the user supplied ChildRelationship objects for t1 and t2
+
+ # A ChildRelationship object describing the relationship between t1 and it's child object,
+ # where t1's child object equals down.t1.
+ # If this relationship is representable as a string, str(self.t1_child_rel) returns a formatted param parsable python string,
+ # e.g. "[2]", ".my_attribute"
+ self.t1_child_rel = child_rel1
+
+ # Another ChildRelationship object describing the relationship between t2 and it's child object.
+ self.t2_child_rel = child_rel2
+
+ # Will cache result of .path() per 'force' as key for performance
+ self._path = dict_()
+
+ self.verbose_level = verbose_level
+
+ def __repr__(self):
+ if self.verbose_level:
+ from deepdiff.summarize import summarize
+
+ if self.additional:
+ additional_repr = summarize(self.additional, max_length=35)
+ result = "<{} {}>".format(self.path(), additional_repr)
+ else:
+ t1_repr = summarize(self.t1, max_length=35)
+ t2_repr = summarize(self.t2, max_length=35)
+ result = "<{} t1:{}, t2:{}>".format(self.path(), t1_repr, t2_repr)
+ else:
+ result = "<{}>".format(self.path())
+ return result
+
+ def __setattr__(self, key, value):
+ # Setting up or down, will set the opposite link in this linked list.
+ if key in UP_DOWN and value is not None:
+ self.__dict__[key] = value
+ opposite_key = UP_DOWN[key]
+ value.__dict__[opposite_key] = self
+ else:
+ self.__dict__[key] = value
+
+ def __iter__(self):
+ yield self.t1
+ yield self.t2
+
+ @property
+ def repetition(self):
+ return self.additional['repetition']
+
+ def auto_generate_child_rel(self, klass, param, param2=None):
+ """
+ Auto-populate self.child_rel1 and self.child_rel2.
+ This requires self.down to be another valid DiffLevel object.
+ :param klass: A ChildRelationship subclass describing the kind of parent-child relationship,
+ e.g. DictRelationship.
+ :param param: A ChildRelationship subclass-dependent parameter describing how to get from parent to child,
+ e.g. the key in a dict
+ """
+ if self.down.t1 is not notpresent: # type: ignore
+ self.t1_child_rel = ChildRelationship.create(
+ klass=klass, parent=self.t1, child=self.down.t1, param=param) # type: ignore
+ if self.down.t2 is not notpresent: # type: ignore
+ self.t2_child_rel = ChildRelationship.create(
+ klass=klass, parent=self.t2, child=self.down.t2, param=param if param2 is None else param2) # type: ignore
+
+ @property
+ def all_up(self):
+ """
+ Get the root object of this comparison.
+ (This is a convenient wrapper for following the up attribute as often as you can.)
+ :rtype: DiffLevel
+ """
+ level = self
+ while level.up:
+ level = level.up
+ return level
+
+ @property
+ def all_down(self):
+ """
+ Get the leaf object of this comparison.
+ (This is a convenient wrapper for following the down attribute as often as you can.)
+ :rtype: DiffLevel
+ """
+ level = self
+ while level.down:
+ level = level.down
+ return level
+
+ @staticmethod
+ def _format_result(root, result):
+ return None if result is None else "{}{}".format(root, result)
+
+ def get_root_key(self, use_t2=False):
+ """
+ Get the path's root key value for this change
+
+ For example if the path to the element that is reported to have a change in value is root['X'][0]
+ then get_root_key should return 'X'
+ """
+ root_level = self.all_up
+ if(use_t2):
+ next_rel = root_level.t2_child_rel
+ else:
+ next_rel = root_level.t1_child_rel or root_level.t2_child_rel # next relationship object to get a formatted param from
+
+ if next_rel:
+ return next_rel.param
+ return notpresent
+
+ def path(self, root="root", force=None, get_parent_too=False, use_t2=False, output_format='str'):
+ """
+ A python syntax string describing how to descend to this level, assuming the top level object is called root.
+ Returns None if the path is not representable as a string.
+ This might be the case for example if there are sets involved (because then there's not path at all) or because
+ custom objects used as dictionary keys (then there is a path but it's not representable).
+ Example: root['ingredients'][0]
+ Note: We will follow the left side of the comparison branch, i.e. using the t1's to build the path.
+ Using t1 or t2 should make no difference at all, except for the last step of a child-added/removed relationship.
+ If it does in any other case, your comparison path is corrupt.
+
+ **Parameters**
+
+ :param root: The result string shall start with this var name
+ :param force: Bends the meaning of "no string representation".
+ If None:
+ Will strictly return Python-parsable expressions. The result those yield will compare
+ equal to the objects in question.
+ If 'yes':
+ Will return a path including '(unrepresentable)' in place of non string-representable parts.
+ If 'fake':
+ Will try to produce an output optimized for readability.
+ This will pretend all iterables are subscriptable, for example.
+ :param output_format: The format of the output. The options are 'str' which is the default and produces a
+ string representation of the path or 'list' to produce a list of keys and attributes
+ that produce the path.
+ """
+ # TODO: We could optimize this by building on top of self.up's path if it is cached there
+ cache_key = "{}{}{}{}".format(force, get_parent_too, use_t2, output_format)
+ if cache_key in self._path:
+ cached = self._path[cache_key]
+ if get_parent_too:
+ parent, param, result = cached
+ return (self._format_result(root, parent), param, self._format_result(root, result))
+ else:
+ return self._format_result(root, cached)
+
+ if output_format == 'str':
+ result = parent = param = ""
+ else:
+ result = []
+
+ level = self.all_up # start at the root
+
+ # traverse all levels of this relationship
+ while level and level is not self:
+ # get this level's relationship object
+ if use_t2:
+ next_rel = level.t2_child_rel or level.t1_child_rel
+ else:
+ next_rel = level.t1_child_rel or level.t2_child_rel # next relationship object to get a formatted param from
+
+ # t1 and t2 both are empty
+ if next_rel is None:
+ break
+
+ # Build path for this level
+ if output_format == 'str':
+ item = next_rel.get_param_repr(force)
+ if item:
+ parent = result
+ param = next_rel.param
+ result += item
+ else:
+ # it seems this path is not representable as a string
+ result = None
+ break
+ elif output_format == 'list':
+ result.append(next_rel.param) # type: ignore
+
+ # Prepare processing next level
+ level = level.down
+
+ if output_format == 'str':
+ if get_parent_too:
+ self._path[cache_key] = (parent, param, result) # type: ignore
+ output = (self._format_result(root, parent), param, self._format_result(root, result)) # type: ignore
+ else:
+ self._path[cache_key] = result
+ output = self._format_result(root, result)
+ else:
+ output = result
+ return output
+
+ def create_deeper(self,
+ new_t1,
+ new_t2,
+ child_relationship_class,
+ child_relationship_param=None,
+ child_relationship_param2=None,
+ report_type=None):
+ """
+ Start a new comparison level and correctly link it to this one.
+ :rtype: DiffLevel
+ :return: New level
+ """
+ level = self.all_down
+ result = DiffLevel(
+ new_t1, new_t2, down=None, up=level, report_type=report_type, verbose_level=self.verbose_level)
+ level.down = result
+ level.auto_generate_child_rel(
+ klass=child_relationship_class, param=child_relationship_param, param2=child_relationship_param2)
+ return result
+
+ def branch_deeper(self,
+ new_t1,
+ new_t2,
+ child_relationship_class,
+ child_relationship_param=None,
+ child_relationship_param2=None,
+ report_type=None):
+ """
+ Branch this comparison: Do not touch this comparison line, but create a new one with exactly the same content,
+ just one level deeper.
+ :rtype: DiffLevel
+ :return: New level in new comparison line
+ """
+ branch = self.copy()
+ return branch.create_deeper(new_t1, new_t2, child_relationship_class,
+ child_relationship_param, child_relationship_param2, report_type)
+
+ def copy(self):
+ """
+ Get a deep copy of this comparision line.
+ :return: The leaf ("downmost") object of the copy.
+ """
+ orig = self.all_up
+ result = copy(orig) # copy top level
+
+ while orig is not None:
+ result.additional = copy(orig.additional)
+
+ if orig.down is not None: # copy and create references to the following level
+ # copy following level
+ result.down = copy(orig.down)
+
+ if orig.t1_child_rel is not None:
+ result.t1_child_rel = ChildRelationship.create(
+ klass=orig.t1_child_rel.__class__,
+ parent=result.t1,
+ child=result.down.t1,
+ param=orig.t1_child_rel.param)
+ if orig.t2_child_rel is not None:
+ result.t2_child_rel = ChildRelationship.create(
+ klass=orig.t2_child_rel.__class__,
+ parent=result.t2,
+ child=result.down.t2,
+ param=orig.t2_child_rel.param)
+
+ # descend to next level
+ orig = orig.down
+ if result.down is not None:
+ result = result.down
+ return result
+
+
+class ChildRelationship:
+ """
+ Describes the relationship between a container object (the "parent") and the contained
+ "child" object.
+ """
+
+ # Format to a be used for representing param.
+ # E.g. for a dict, this turns a formatted param param "42" into "[42]".
+ param_repr_format = None
+
+ # This is a hook allowing subclasses to manipulate param strings.
+ # :param string: Input string
+ # :return: Manipulated string, as appropriate in this context.
+ quote_str = None
+
+ @staticmethod
+ def create(klass, parent, child, param=None):
+ if not issubclass(klass, ChildRelationship):
+ raise TypeError
+ return klass(parent, child, param)
+
+ def __init__(self, parent, child, param=None):
+ # The parent object of this relationship, e.g. a dict
+ self.parent = parent
+
+ # The child object of this relationship, e.g. a value in a dict
+ self.child = child
+
+ # A subclass-dependent parameter describing how to get from parent to child, e.g. the key in a dict
+ self.param = param
+
+ def __repr__(self):
+ from deepdiff.summarize import summarize
+
+ name = "<{} parent:{}, child:{}, param:{}>"
+ parent = summarize(self.parent, max_length=35)
+ child = summarize(self.child, max_length=35)
+ param = summarize(self.param, max_length=15)
+ return name.format(self.__class__.__name__, parent, child, param)
+
+ def get_param_repr(self, force=None):
+ """
+ Returns a formatted param python parsable string describing this relationship,
+ or None if the relationship is not representable as a string.
+ This string can be appended to the parent Name.
+ Subclasses representing a relationship that cannot be expressed as a string override this method to return None.
+ Examples: "[2]", ".attribute", "['mykey']"
+ :param force: Bends the meaning of "no string representation".
+ If None:
+ Will strictly return partials of Python-parsable expressions. The result those yield will compare
+ equal to the objects in question.
+ If 'yes':
+ Will return a formatted param including '(unrepresentable)' instead of the non string-representable part.
+
+ """
+ return self.stringify_param(force)
+
+ def stringify_param(self, force=None):
+ """
+ Convert param to a string. Return None if there is no string representation.
+ This is called by get_param_repr()
+ :param force: Bends the meaning of "no string representation".
+ If None:
+ Will strictly return Python-parsable expressions. The result those yield will compare
+ equal to the objects in question.
+ If 'yes':
+ Will return '(unrepresentable)' instead of None if there is no string representation
+
+ TODO: stringify_param has issues with params that when converted to string via repr,
+ it is not straight forward to turn them back into the original object.
+ Although repr is meant to be able to reconstruct the original object but for complex objects, repr
+ often does not recreate the original object.
+ Perhaps we should log that the repr reconstruction failed so the user is aware.
+ """
+ param = self.param
+ if isinstance(param, strings):
+ result = stringify_element(param, quote_str=self.quote_str)
+ elif isinstance(param, tuple): # Currently only for numpy ndarrays
+ result = ']['.join(map(repr, param))
+ elif hasattr(param, '__dataclass_fields__'):
+ attrs_to_values = [f"{key}={value}" for key, value in [(i, getattr(param, i)) for i in param.__dataclass_fields__]] # type: ignore
+ result = f"{param.__class__.__name__}({','.join(attrs_to_values)})"
+ else:
+ candidate = repr(param)
+ try:
+ resurrected = literal_eval_extended(candidate)
+ # Note: This will miss string-representable custom objects.
+ # However, the only alternative I can currently think of is using eval() which is inherently dangerous.
+ except (SyntaxError, ValueError) as err:
+ logger.error(
+ f'stringify_param was not able to get a proper repr for "{param}". '
+ "This object will be reported as None. Add instructions for this object to DeepDiff's "
+ f"helper.literal_eval_extended to make it work properly: {err}")
+ result = None
+ else:
+ result = candidate if resurrected == param else None
+
+ if result:
+ result = ':' if self.param_repr_format is None else self.param_repr_format.format(result)
+
+ return result
+
+
+class DictRelationship(ChildRelationship):
+ param_repr_format = "[{}]"
+ quote_str = "'{}'"
+
+
+class NumpyArrayRelationship(ChildRelationship):
+ param_repr_format = "[{}]"
+ quote_str = None
+
+
+class SubscriptableIterableRelationship(DictRelationship):
+ pass
+
+
+class InaccessibleRelationship(ChildRelationship):
+ pass
+
+
+# there is no random access to set elements
+class SetRelationship(InaccessibleRelationship):
+ pass
+
+
+class NonSubscriptableIterableRelationship(InaccessibleRelationship):
+
+ param_repr_format = "[{}]"
+
+ def get_param_repr(self, force=None):
+ if force == 'yes':
+ result = "(unrepresentable)"
+ elif force == 'fake' and self.param:
+ result = self.stringify_param()
+ else:
+ result = None
+
+ return result
+
+
+class AttributeRelationship(ChildRelationship):
+ param_repr_format = ".{}"
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/operator.py b/.venv/lib/python3.12/site-packages/deepdiff/operator.py
new file mode 100644
index 00000000..018fa3c6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/operator.py
@@ -0,0 +1,69 @@
+import re
+from typing import Any, Optional, List
+from abc import ABCMeta, abstractmethod
+from deepdiff.helper import convert_item_or_items_into_compiled_regexes_else_none
+
+
+
+class BaseOperatorPlus(metaclass=ABCMeta):
+
+ @abstractmethod
+ def match(self, level) -> bool:
+ """
+ Given a level which includes t1 and t2 in the tree view, is this operator a good match to compare t1 and t2?
+ If yes, we will run the give_up_diffing to compare t1 and t2 for this level.
+ """
+ pass
+
+ @abstractmethod
+ def give_up_diffing(self, level, diff_instance: float) -> bool:
+ """
+ Given a level which includes t1 and t2 in the tree view, and the "distance" between l1 and l2.
+ do we consider t1 and t2 to be equal or not. The distance is a number between zero to one and is calculated by DeepDiff to measure how similar objects are.
+ """
+
+ @abstractmethod
+ def normalize_value_for_hashing(self, parent: Any, obj: Any) -> Any:
+ """
+ You can use this function to normalize values for ignore_order=True
+
+ For example, you may want to turn all the words to be lowercase. Then you return obj.lower()
+ """
+ pass
+
+
+
+class BaseOperator:
+
+ def __init__(self, regex_paths:Optional[List[str]]=None, types:Optional[List[type]]=None):
+ if regex_paths:
+ self.regex_paths = convert_item_or_items_into_compiled_regexes_else_none(regex_paths)
+ else:
+ self.regex_paths = None
+ self.types = types
+
+ def match(self, level) -> bool:
+ if self.regex_paths:
+ for pattern in self.regex_paths:
+ matched = re.search(pattern, level.path()) is not None
+ if matched:
+ return True
+ if self.types:
+ for type_ in self.types:
+ if isinstance(level.t1, type_) and isinstance(level.t2, type_):
+ return True
+ return False
+
+ def give_up_diffing(self, level, diff_instance) -> bool:
+ raise NotImplementedError('Please implement the diff function.')
+
+
+class PrefixOrSuffixOperator:
+
+ def match(self, level) -> bool:
+ return level.t1 and level.t2 and isinstance(level.t1, str) and isinstance(level.t2, str)
+
+ def give_up_diffing(self, level, diff_instance) -> bool:
+ t1 = level.t1
+ t2 = level.t2
+ return t1.startswith(t2) or t2.startswith(t1)
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/path.py b/.venv/lib/python3.12/site-packages/deepdiff/path.py
new file mode 100644
index 00000000..ee63b5b9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/path.py
@@ -0,0 +1,316 @@
+import logging
+from ast import literal_eval
+from functools import lru_cache
+
+logger = logging.getLogger(__name__)
+
+GETATTR = 'GETATTR'
+GET = 'GET'
+
+
+class PathExtractionError(ValueError):
+ pass
+
+
+class RootCanNotBeModified(ValueError):
+ pass
+
+
+def _add_to_elements(elements, elem, inside):
+ # Ignore private items
+ if not elem:
+ return
+ if not elem.startswith('__'):
+ remove_quotes = False
+ if '𝆺𝅥𝅯' in elem or '\\' in elem:
+ remove_quotes = True
+ else:
+ try:
+ elem = literal_eval(elem)
+ remove_quotes = False
+ except (ValueError, SyntaxError):
+ remove_quotes = True
+ if remove_quotes and elem[0] == elem[-1] and elem[0] in {'"', "'"}:
+ elem = elem[1: -1]
+ action = GETATTR if inside == '.' else GET
+ elements.append((elem, action))
+
+
+DEFAULT_FIRST_ELEMENT = ('root', GETATTR)
+
+
+@lru_cache(maxsize=1024 * 128)
+def _path_to_elements(path, root_element=DEFAULT_FIRST_ELEMENT):
+ """
+ Given a path, it extracts the elements that form the path and their relevant most likely retrieval action.
+
+ >>> from deepdiff import _path_to_elements
+ >>> path = "root[4.3].b['a3']"
+ >>> _path_to_elements(path, root_element=None)
+ [(4.3, 'GET'), ('b', 'GETATTR'), ('a3', 'GET')]
+ """
+ if isinstance(path, (tuple, list)):
+ return path
+ elements = []
+ if root_element:
+ elements.append(root_element)
+ elem = ''
+ inside = False
+ prev_char = None
+ path = path[4:] # removing "root from the beginning"
+ brackets = []
+ inside_quotes = False
+ quote_used = ''
+ for char in path:
+ if prev_char == '𝆺𝅥𝅯':
+ elem += char
+ elif char in {'"', "'"}:
+ elem += char
+ # If we are inside and the quote is not what we expected, the quote is not closing
+ if not(inside_quotes and quote_used != char):
+ inside_quotes = not inside_quotes
+ if inside_quotes:
+ quote_used = char
+ else:
+ _add_to_elements(elements, elem, inside)
+ elem = ''
+ quote_used = ''
+ elif inside_quotes:
+ elem += char
+ elif char == '[':
+ if inside == '.':
+ _add_to_elements(elements, elem, inside)
+ inside = '['
+ elem = ''
+ # we are already inside. The bracket is a part of the word.
+ elif inside == '[':
+ elem += char
+ else:
+ inside = '['
+ brackets.append('[')
+ elem = ''
+ elif char == '.':
+ if inside == '[':
+ elem += char
+ elif inside == '.':
+ _add_to_elements(elements, elem, inside)
+ elem = ''
+ else:
+ inside = '.'
+ elem = ''
+ elif char == ']':
+ if brackets and brackets[-1] == '[':
+ brackets.pop()
+ if brackets:
+ elem += char
+ else:
+ _add_to_elements(elements, elem, inside)
+ elem = ''
+ inside = False
+ else:
+ elem += char
+ prev_char = char
+ if elem:
+ _add_to_elements(elements, elem, inside)
+ return tuple(elements)
+
+
+def _get_nested_obj(obj, elements, next_element=None):
+ for (elem, action) in elements:
+ if action == GET:
+ obj = obj[elem]
+ elif action == GETATTR:
+ obj = getattr(obj, elem)
+ return obj
+
+
+def _guess_type(elements, elem, index, next_element):
+ # If we are not at the last elements
+ if index < len(elements) - 1:
+ # We assume it is a nested dictionary not a nested list
+ return {}
+ if isinstance(next_element, int):
+ return []
+ return {}
+
+
+def _get_nested_obj_and_force(obj, elements, next_element=None):
+ prev_elem = None
+ prev_action = None
+ prev_obj = obj
+ for index, (elem, action) in enumerate(elements):
+ _prev_obj = obj
+ if action == GET:
+ try:
+ obj = obj[elem]
+ prev_obj = _prev_obj
+ except KeyError:
+ obj[elem] = _guess_type(elements, elem, index, next_element)
+ obj = obj[elem]
+ prev_obj = _prev_obj
+ except IndexError:
+ if isinstance(obj, list) and isinstance(elem, int) and elem >= len(obj):
+ obj.extend([None] * (elem - len(obj)))
+ obj.append(_guess_type(elements, elem, index), next_element)
+ obj = obj[-1]
+ prev_obj = _prev_obj
+ elif isinstance(obj, list) and len(obj) == 0 and prev_elem:
+ # We ran into an empty list that should have been a dictionary
+ # We need to change it from an empty list to a dictionary
+ obj = {elem: _guess_type(elements, elem, index, next_element)}
+ if prev_action == GET:
+ prev_obj[prev_elem] = obj
+ else:
+ setattr(prev_obj, prev_elem, obj)
+ obj = obj[elem]
+ elif action == GETATTR:
+ obj = getattr(obj, elem)
+ prev_obj = _prev_obj
+ prev_elem = elem
+ prev_action = action
+ return obj
+
+
+def extract(obj, path):
+ """
+ Get the item from obj based on path.
+
+ Example:
+
+ >>> from deepdiff import extract
+ >>> obj = {1: [{'2': 'b'}, 3], 2: [4, 5]}
+ >>> path = "root[1][0]['2']"
+ >>> extract(obj, path)
+ 'b'
+
+ Note that you can use extract in conjunction with DeepDiff results
+ or even with the search and :ref:`deepsearch_label` modules. For example:
+
+ >>> from deepdiff import grep
+ >>> obj = {1: [{'2': 'b'}, 3], 2: [4, 5]}
+ >>> result = obj | grep(5)
+ >>> result
+ {'matched_values': ['root[2][1]']}
+ >>> result['matched_values'][0]
+ 'root[2][1]'
+ >>> path = result['matched_values'][0]
+ >>> extract(obj, path)
+ 5
+
+
+ .. note::
+ Note that even if DeepDiff tried gives you a path to an item in a set,
+ there is no such thing in Python and hence you will get an error trying
+ to extract that item from a set.
+ If you want to be able to get items from sets, use the SetOrdered module
+ to generate the sets.
+ In fact Deepdiff uses SetOrdered as a dependency.
+
+ >>> from deepdiff import grep, extract
+ >>> obj = {"a", "b"}
+ >>> obj | grep("b")
+ Set item detected in the path.'set' objects do NOT support indexing. But DeepSearch will still report a path.
+ {'matched_values': SetOrdered(['root[0]'])}
+ >>> extract(obj, 'root[0]')
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ File "deepdiff/deepdiff/path.py", line 126, in extract
+ return _get_nested_obj(obj, elements)
+ File "deepdiff/deepdiff/path.py", line 84, in _get_nested_obj
+ obj = obj[elem]
+ TypeError: 'set' object is not subscriptable
+ >>> from orderly_set import SetOrdered
+ >>> obj = SetOrdered(["a", "b"])
+ >>> extract(obj, 'root[0]')
+ 'a'
+
+ """
+ elements = _path_to_elements(path, root_element=None)
+ return _get_nested_obj(obj, elements)
+
+
+def parse_path(path, root_element=DEFAULT_FIRST_ELEMENT, include_actions=False):
+ """
+ Parse a path to a format that is machine readable
+
+ **Parameters**
+
+ path : A string
+ The path string such as "root[1][2]['age']"
+
+ root_element: string, default='root'
+ What the root is called in the path.
+
+ include_actions: boolean, default=False
+ If True, we return the action required to retrieve the item at each element of the path.
+
+ **Examples**
+
+ >>> from deepdiff import parse_path
+ >>> parse_path("root[1][2]['age']")
+ [1, 2, 'age']
+ >>> parse_path("root[1][2]['age']", include_actions=True)
+ [{'element': 1, 'action': 'GET'}, {'element': 2, 'action': 'GET'}, {'element': 'age', 'action': 'GET'}]
+ >>>
+ >>> parse_path("root['joe'].age")
+ ['joe', 'age']
+ >>> parse_path("root['joe'].age", include_actions=True)
+ [{'element': 'joe', 'action': 'GET'}, {'element': 'age', 'action': 'GETATTR'}]
+
+ """
+
+ result = _path_to_elements(path, root_element=root_element)
+ result = iter(result)
+ if root_element:
+ next(result) # We don't want the root item
+ if include_actions is False:
+ return [i[0] for i in result]
+ return [{'element': i[0], 'action': i[1]} for i in result]
+
+
+def stringify_element(param, quote_str=None):
+ has_quote = "'" in param
+ has_double_quote = '"' in param
+ if has_quote and has_double_quote and not quote_str:
+ new_param = []
+ for char in param:
+ if char in {'"', "'"}:
+ new_param.append('𝆺𝅥𝅯')
+ new_param.append(char)
+ result = '"' + ''.join(new_param) + '"'
+ elif has_quote:
+ result = f'"{param}"'
+ elif has_double_quote:
+ result = f"'{param}'"
+ else:
+ result = param if quote_str is None else quote_str.format(param)
+ return result
+
+
+def stringify_path(path, root_element=DEFAULT_FIRST_ELEMENT, quote_str="'{}'"):
+ """
+ Gets the path as an string.
+
+ For example [1, 2, 'age'] should become
+ root[1][2]['age']
+ """
+ if not path:
+ return root_element[0]
+ result = [root_element[0]]
+ has_actions = False
+ try:
+ if path[0][1] in {GET, GETATTR}:
+ has_actions = True
+ except (KeyError, IndexError, TypeError):
+ pass
+ if not has_actions:
+ path = [(i, GET) for i in path]
+ path[0] = (path[0][0], root_element[1]) # The action for the first element might be a GET or GETATTR. We update the action based on the root_element.
+ for element, action in path:
+ if isinstance(element, str) and action == GET:
+ element = stringify_element(element, quote_str)
+ if action == GET:
+ result.append(f"[{element}]")
+ else:
+ result.append(f".{element}")
+ return ''.join(result)
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/py.typed b/.venv/lib/python3.12/site-packages/deepdiff/py.typed
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/py.typed
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/search.py b/.venv/lib/python3.12/site-packages/deepdiff/search.py
new file mode 100644
index 00000000..007c566c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/search.py
@@ -0,0 +1,358 @@
+#!/usr/bin/env python
+import re
+from collections.abc import MutableMapping, Iterable
+from deepdiff.helper import SetOrdered
+import logging
+
+from deepdiff.helper import (
+ strings, numbers, add_to_frozen_set, get_doc, dict_, RE_COMPILED_TYPE, ipranges
+)
+
+logger = logging.getLogger(__name__)
+
+
+doc = get_doc('search_doc.rst')
+
+
+class DeepSearch(dict):
+ r"""
+ **DeepSearch**
+
+ Deep Search inside objects to find the item matching your criteria.
+
+ **Parameters**
+
+ obj : The object to search within
+
+ item : The item to search for
+
+ verbose_level : int >= 0, default = 1.
+ Verbose level one shows the paths of found items.
+ Verbose level 2 shows the path and value of the found items.
+
+ exclude_paths: list, default = None.
+ List of paths to exclude from the report.
+
+ exclude_types: list, default = None.
+ List of object types to exclude from the report.
+
+ case_sensitive: Boolean, default = False
+
+ match_string: Boolean, default = False
+ If True, the value of the object or its children have to exactly match the item.
+ If False, the value of the item can be a part of the value of the object or its children
+
+ use_regexp: Boolean, default = False
+
+ strict_checking: Boolean, default = True
+ If True, it will check the type of the object to match, so when searching for '1234',
+ it will NOT match the int 1234. Currently this only affects the numeric values searching.
+
+ **Returns**
+
+ A DeepSearch object that has the matched paths and matched values.
+
+ **Supported data types**
+
+ int, string, unicode, dictionary, list, tuple, set, frozenset, OrderedDict, NamedTuple and custom objects!
+
+ **Examples**
+
+ Importing
+ >>> from deepdiff import DeepSearch
+ >>> from pprint import pprint
+
+ Search in list for string
+ >>> obj = ["long somewhere", "string", 0, "somewhere great!"]
+ >>> item = "somewhere"
+ >>> ds = DeepSearch(obj, item, verbose_level=2)
+ >>> print(ds)
+ {'matched_values': {'root[3]': 'somewhere great!', 'root[0]': 'long somewhere'}}
+
+ Search in nested data for string
+ >>> obj = ["something somewhere", {"long": "somewhere", "string": 2, 0: 0, "somewhere": "around"}]
+ >>> item = "somewhere"
+ >>> ds = DeepSearch(obj, item, verbose_level=2)
+ >>> pprint(ds, indent=2)
+ { 'matched_paths': {"root[1]['somewhere']": 'around'},
+ 'matched_values': { 'root[0]': 'something somewhere',
+ "root[1]['long']": 'somewhere'}}
+
+ """
+
+ warning_num = 0
+
+ def __init__(self,
+ obj,
+ item,
+ exclude_paths=SetOrdered(),
+ exclude_regex_paths=SetOrdered(),
+ exclude_types=SetOrdered(),
+ verbose_level=1,
+ case_sensitive=False,
+ match_string=False,
+ use_regexp=False,
+ strict_checking=True,
+ **kwargs):
+ if kwargs:
+ raise ValueError((
+ "The following parameter(s) are not valid: %s\n"
+ "The valid parameters are obj, item, exclude_paths, exclude_types,\n"
+ "case_sensitive, match_string and verbose_level."
+ ) % ', '.join(kwargs.keys()))
+
+ self.obj = obj
+ self.case_sensitive = case_sensitive if isinstance(item, strings) else True
+ item = item if self.case_sensitive else item.lower()
+ self.exclude_paths = SetOrdered(exclude_paths)
+ self.exclude_regex_paths = [re.compile(exclude_regex_path) for exclude_regex_path in exclude_regex_paths]
+ self.exclude_types = SetOrdered(exclude_types)
+ self.exclude_types_tuple = tuple(
+ exclude_types) # we need tuple for checking isinstance
+ self.verbose_level = verbose_level
+ self.update(
+ matched_paths=self.__set_or_dict(),
+ matched_values=self.__set_or_dict(),
+ unprocessed=[])
+ self.use_regexp = use_regexp
+ if not strict_checking and (isinstance(item, numbers) or isinstance(item, ipranges)):
+ item = str(item)
+ if self.use_regexp:
+ try:
+ item = re.compile(item)
+ except TypeError as e:
+ raise TypeError(f"The passed item of {item} is not usable for regex: {e}") from None
+ self.strict_checking = strict_checking
+
+ # Cases where user wants to match exact string item
+ self.match_string = match_string
+
+ self.__search(obj, item, parents_ids=frozenset({id(obj)}))
+
+ empty_keys = [k for k, v in self.items() if not v]
+
+ for k in empty_keys:
+ del self[k]
+
+ def __set_or_dict(self):
+ return dict_() if self.verbose_level >= 2 else SetOrdered()
+
+ def __report(self, report_key, key, value):
+ if self.verbose_level >= 2:
+ self[report_key][key] = value
+ else:
+ self[report_key].add(key)
+
+ def __search_obj(self,
+ obj,
+ item,
+ parent,
+ parents_ids=frozenset(),
+ is_namedtuple=False):
+ """Search objects"""
+ found = False
+ if obj == item:
+ found = True
+ # We report the match but also continue inside the match to see if there are
+ # further matches inside the `looped` object.
+ self.__report(report_key='matched_values', key=parent, value=obj)
+
+ try:
+ if is_namedtuple:
+ obj = obj._asdict()
+ else:
+ # Skip magic methods. Slightly hacky, but unless people are defining
+ # new magic methods they want to search, it should work fine.
+ obj = {i: getattr(obj, i) for i in dir(obj)
+ if not (i.startswith('__') and i.endswith('__'))}
+ except AttributeError:
+ try:
+ obj = {i: getattr(obj, i) for i in obj.__slots__}
+ except AttributeError:
+ if not found:
+ self['unprocessed'].append("%s" % parent)
+
+ return
+
+ self.__search_dict(
+ obj, item, parent, parents_ids, print_as_attribute=True)
+
+ def __skip_this(self, item, parent):
+ skip = False
+ if parent in self.exclude_paths:
+ skip = True
+ elif self.exclude_regex_paths and any(
+ [exclude_regex_path.search(parent) for exclude_regex_path in self.exclude_regex_paths]):
+ skip = True
+ else:
+ if isinstance(item, self.exclude_types_tuple):
+ skip = True
+
+ return skip
+
+ def __search_dict(self,
+ obj,
+ item,
+ parent,
+ parents_ids=frozenset(),
+ print_as_attribute=False):
+ """Search dictionaries"""
+ if print_as_attribute:
+ parent_text = "%s.%s"
+ else:
+ parent_text = "%s[%s]"
+
+ obj_keys = SetOrdered(obj.keys())
+
+ for item_key in obj_keys:
+ if not print_as_attribute and isinstance(item_key, strings):
+ item_key_str = "'%s'" % item_key
+ else:
+ item_key_str = item_key
+
+ obj_child = obj[item_key]
+
+ item_id = id(obj_child)
+
+ if parents_ids and item_id in parents_ids:
+ continue
+
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+
+ new_parent = parent_text % (parent, item_key_str)
+ new_parent_cased = new_parent if self.case_sensitive else new_parent.lower()
+
+ str_item = str(item)
+ if (self.match_string and str_item == new_parent_cased) or\
+ (not self.match_string and str_item in new_parent_cased) or\
+ (self.use_regexp and item.search(new_parent_cased)):
+ self.__report(
+ report_key='matched_paths',
+ key=new_parent,
+ value=obj_child)
+
+ self.__search(
+ obj_child,
+ item,
+ parent=new_parent,
+ parents_ids=parents_ids_added)
+
+ def __search_iterable(self,
+ obj,
+ item,
+ parent="root",
+ parents_ids=frozenset()):
+ """Search iterables except dictionaries, sets and strings."""
+ for i, thing in enumerate(obj):
+ new_parent = "{}[{}]".format(parent, i)
+ if self.__skip_this(thing, parent=new_parent):
+ continue
+
+ if self.case_sensitive or not isinstance(thing, strings):
+ thing_cased = thing
+ else:
+ thing_cased = thing.lower()
+
+ if not self.use_regexp and thing_cased == item:
+ self.__report(
+ report_key='matched_values', key=new_parent, value=thing)
+ else:
+ item_id = id(thing)
+ if parents_ids and item_id in parents_ids:
+ continue
+ parents_ids_added = add_to_frozen_set(parents_ids, item_id)
+ self.__search(thing, item, "%s[%s]" %
+ (parent, i), parents_ids_added)
+
+ def __search_str(self, obj, item, parent):
+ """Compare strings"""
+ obj_text = obj if self.case_sensitive else obj.lower()
+
+ is_matched = False
+ if self.use_regexp:
+ is_matched = item.search(obj_text)
+ elif (self.match_string and item == obj_text) or (not self.match_string and item in obj_text):
+ is_matched = True
+ if is_matched:
+ self.__report(report_key='matched_values', key=parent, value=obj)
+
+ def __search_numbers(self, obj, item, parent):
+ if (
+ item == obj or (
+ not self.strict_checking and (
+ item == str(obj) or (
+ self.use_regexp and item.search(str(obj))
+ )
+ )
+ )
+ ):
+ self.__report(report_key='matched_values', key=parent, value=obj)
+
+ def __search_tuple(self, obj, item, parent, parents_ids):
+ # Checking to see if it has _fields. Which probably means it is a named
+ # tuple.
+ try:
+ obj._asdict
+ # It must be a normal tuple
+ except AttributeError:
+ self.__search_iterable(obj, item, parent, parents_ids)
+ # We assume it is a namedtuple then
+ else:
+ self.__search_obj(
+ obj, item, parent, parents_ids, is_namedtuple=True)
+
+ def __search(self, obj, item, parent="root", parents_ids=frozenset()):
+ """The main search method"""
+ if self.__skip_this(item, parent):
+ return
+
+ elif isinstance(obj, strings) and isinstance(item, (strings, RE_COMPILED_TYPE)):
+ self.__search_str(obj, item, parent)
+
+ elif isinstance(obj, strings) and isinstance(item, numbers):
+ return
+
+ elif isinstance(obj, ipranges):
+ self.__search_str(str(obj), item, parent)
+
+ elif isinstance(obj, numbers):
+ self.__search_numbers(obj, item, parent)
+
+ elif isinstance(obj, MutableMapping):
+ self.__search_dict(obj, item, parent, parents_ids)
+
+ elif isinstance(obj, tuple):
+ self.__search_tuple(obj, item, parent, parents_ids)
+
+ elif isinstance(obj, (set, frozenset)):
+ if self.warning_num < 10:
+ logger.warning(
+ "Set item detected in the path."
+ "'set' objects do NOT support indexing. But DeepSearch will still report a path."
+ )
+ self.warning_num += 1
+ self.__search_iterable(obj, item, parent, parents_ids)
+
+ elif isinstance(obj, Iterable) and not isinstance(obj, strings):
+ self.__search_iterable(obj, item, parent, parents_ids)
+
+ else:
+ self.__search_obj(obj, item, parent, parents_ids)
+
+
+class grep:
+ __doc__ = doc
+
+ def __init__(self,
+ item,
+ **kwargs):
+ self.item = item
+ self.kwargs = kwargs
+
+ def __ror__(self, other):
+ return DeepSearch(obj=other, item=self.item, **self.kwargs)
+
+
+if __name__ == "__main__": # pragma: no cover
+ import doctest
+ doctest.testmod()
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/serialization.py b/.venv/lib/python3.12/site-packages/deepdiff/serialization.py
new file mode 100644
index 00000000..c148aadf
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/serialization.py
@@ -0,0 +1,730 @@
+import pickle
+import sys
+import io
+import os
+import json
+import uuid
+import logging
+import re # NOQA
+import builtins # NOQA
+import datetime # NOQA
+import decimal # NOQA
+import orderly_set # NOQA
+import collections # NOQA
+from copy import deepcopy, copy
+from functools import partial
+from collections.abc import Mapping
+from typing import (
+ Callable, Optional, Union,
+ overload, Literal, Any,
+)
+from deepdiff.helper import (
+ strings,
+ get_type,
+ TEXT_VIEW,
+ np_float32,
+ np_float64,
+ np_int32,
+ np_int64,
+ np_ndarray,
+ Opcode,
+ SetOrdered,
+ pydantic_base_model_type,
+ PydanticBaseModel,
+ NotPresent,
+ ipranges,
+)
+from deepdiff.model import DeltaResult
+
+try:
+ import orjson
+except ImportError: # pragma: no cover.
+ orjson = None
+
+logger = logging.getLogger(__name__)
+
+class UnsupportedFormatErr(TypeError):
+ pass
+
+
+NONE_TYPE = type(None)
+
+CSV_HEADER_MAX_CHUNK_SIZE = 2048 # The chunk needs to be big enough that covers a couple of rows of data.
+
+
+MODULE_NOT_FOUND_MSG = 'DeepDiff Delta did not find {} in your modules. Please make sure it is already imported.'
+FORBIDDEN_MODULE_MSG = "Module '{}' is forbidden. You need to explicitly pass it by passing a safe_to_import parameter"
+DELTA_IGNORE_ORDER_NEEDS_REPETITION_REPORT = 'report_repetition must be set to True when ignore_order is True to create the delta object.'
+DELTA_ERROR_WHEN_GROUP_BY = 'Delta can not be made when group_by is used since the structure of data is modified from the original form.'
+
+SAFE_TO_IMPORT = {
+ 'builtins.range',
+ 'builtins.complex',
+ 'builtins.set',
+ 'builtins.frozenset',
+ 'builtins.slice',
+ 'builtins.str',
+ 'builtins.bytes',
+ 'builtins.list',
+ 'builtins.tuple',
+ 'builtins.int',
+ 'builtins.float',
+ 'builtins.dict',
+ 'builtins.bool',
+ 'builtins.bin',
+ 'builtins.None',
+ 'datetime.datetime',
+ 'datetime.time',
+ 'datetime.timedelta',
+ 'decimal.Decimal',
+ 'uuid.UUID',
+ 'orderly_set.sets.OrderedSet',
+ 'orderly_set.sets.OrderlySet',
+ 'orderly_set.sets.StableSetEq',
+ 'deepdiff.helper.SetOrdered',
+ 'collections.namedtuple',
+ 'collections.OrderedDict',
+ 're.Pattern',
+ 'deepdiff.helper.Opcode',
+}
+
+
+TYPE_STR_TO_TYPE = {
+ 'range': range,
+ 'complex': complex,
+ 'set': set,
+ 'frozenset': frozenset,
+ 'slice': slice,
+ 'str': str,
+ 'bytes': bytes,
+ 'list': list,
+ 'tuple': tuple,
+ 'int': int,
+ 'float': float,
+ 'dict': dict,
+ 'bool': bool,
+ 'bin': bin,
+ 'None': None,
+ 'NoneType': None,
+ 'datetime': datetime.datetime,
+ 'time': datetime.time,
+ 'timedelta': datetime.timedelta,
+ 'Decimal': decimal.Decimal,
+ 'SetOrdered': SetOrdered,
+ 'namedtuple': collections.namedtuple,
+ 'OrderedDict': collections.OrderedDict,
+ 'Pattern': re.Pattern,
+ 'iprange': str,
+}
+
+
+class ModuleNotFoundError(ImportError):
+ """
+ Raised when the module is not found in sys.modules
+ """
+ pass
+
+
+class ForbiddenModule(ImportError):
+ """
+ Raised when a module is not explicitly allowed to be imported
+ """
+ pass
+
+
+class SerializationMixin:
+
+ def to_json_pickle(self):
+ """
+ :ref:`to_json_pickle_label`
+ Get the json pickle of the diff object. Unless you need all the attributes and functionality of DeepDiff, running to_json() is the safer option that json pickle.
+ """
+ try:
+ import jsonpickle
+ copied = self.copy() # type: ignore
+ return jsonpickle.encode(copied)
+ except ImportError: # pragma: no cover. Json pickle is getting deprecated.
+ logger.error('jsonpickle library needs to be installed in order to run to_json_pickle') # pragma: no cover. Json pickle is getting deprecated.
+
+ @classmethod
+ def from_json_pickle(cls, value):
+ """
+ :ref:`from_json_pickle_label`
+ Load DeepDiff object with all the bells and whistles from the json pickle dump.
+ Note that json pickle dump comes from to_json_pickle
+ """
+ try:
+ import jsonpickle
+ return jsonpickle.decode(value)
+ except ImportError: # pragma: no cover. Json pickle is getting deprecated.
+ logger.error('jsonpickle library needs to be installed in order to run from_json_pickle') # pragma: no cover. Json pickle is getting deprecated.
+
+ def to_json(self, default_mapping: Optional[dict]=None, force_use_builtin_json=False, **kwargs):
+ """
+ Dump json of the text view.
+ **Parameters**
+
+ default_mapping : dictionary(optional), a dictionary of mapping of different types to json types.
+
+ by default DeepDiff converts certain data types. For example Decimals into floats so they can be exported into json.
+ If you have a certain object type that the json serializer can not serialize it, please pass the appropriate type
+ conversion through this dictionary.
+
+ force_use_builtin_json: Boolean, default = False
+ When True, we use Python's builtin Json library for serialization,
+ even if Orjson is installed.
+
+
+ kwargs: Any other kwargs you pass will be passed on to Python's json.dumps()
+
+ **Example**
+
+ Serialize custom objects
+ >>> class A:
+ ... pass
+ ...
+ >>> class B:
+ ... pass
+ ...
+ >>> t1 = A()
+ >>> t2 = B()
+ >>> ddiff = DeepDiff(t1, t2)
+ >>> ddiff.to_json()
+ TypeError: We do not know how to convert <__main__.A object at 0x10648> of type <class '__main__.A'> for json serialization. Please pass the default_mapping parameter with proper mapping of the object to a basic python type.
+
+ >>> default_mapping = {A: lambda x: 'obj A', B: lambda x: 'obj B'}
+ >>> ddiff.to_json(default_mapping=default_mapping)
+ '{"type_changes": {"root": {"old_type": "A", "new_type": "B", "old_value": "obj A", "new_value": "obj B"}}}'
+ """
+ dic = self.to_dict(view_override=TEXT_VIEW)
+ return json_dumps(
+ dic,
+ default_mapping=default_mapping,
+ force_use_builtin_json=force_use_builtin_json,
+ **kwargs,
+ )
+
+ def to_dict(self, view_override: Optional[str]=None) -> dict:
+ """
+ convert the result to a python dictionary. You can override the view type by passing view_override.
+
+ **Parameters**
+
+ view_override: view type, default=None,
+ override the view that was used to generate the diff when converting to the dictionary.
+ The options are the text or tree.
+ """
+
+ view = view_override if view_override else self.view # type: ignore
+ return dict(self._get_view_results(view)) # type: ignore
+
+ def _to_delta_dict(
+ self,
+ directed: bool = True,
+ report_repetition_required: bool = True,
+ always_include_values: bool = False,
+ ) -> dict:
+ """
+ Dump to a dictionary suitable for delta usage.
+ Unlike to_dict, this is not dependent on the original view that the user chose to create the diff.
+
+ **Parameters**
+
+ directed : Boolean, default=True, whether to create a directional delta dictionary or a symmetrical
+
+ Note that in the current implementation the symmetrical delta (non-directional) is ONLY used for verifying that
+ the delta is being applied to the exact same values as what was used to generate the delta and has
+ no other usages.
+
+ If this option is set as True, then the dictionary will not have the "old_value" in the output.
+ Otherwise it will have the "old_value". "old_value" is the value of the item in t1.
+
+ If delta = Delta(DeepDiff(t1, t2)) then
+ t1 + delta == t2
+
+ Note that it the items in t1 + delta might have slightly different order of items than t2 if ignore_order
+ was set to be True in the diff object.
+
+ """
+ if self.group_by is not None: # type: ignore
+ raise ValueError(DELTA_ERROR_WHEN_GROUP_BY)
+
+ if directed and not always_include_values:
+ _iterable_opcodes = {} # type: ignore
+ for path, op_codes in self._iterable_opcodes.items(): # type: ignore
+ _iterable_opcodes[path] = []
+ for op_code in op_codes:
+ new_op_code = Opcode(
+ tag=op_code.tag,
+ t1_from_index=op_code.t1_from_index,
+ t1_to_index=op_code.t1_to_index,
+ t2_from_index=op_code.t2_from_index,
+ t2_to_index=op_code.t2_to_index,
+ new_values=op_code.new_values,
+ )
+ _iterable_opcodes[path].append(new_op_code)
+ else:
+ _iterable_opcodes = self._iterable_opcodes # type: ignore
+
+ result = DeltaResult(
+ tree_results=self.tree, # type: ignore
+ ignore_order=self.ignore_order, # type: ignore
+ always_include_values=always_include_values,
+ _iterable_opcodes=_iterable_opcodes,
+ )
+ result.remove_empty_keys()
+ if report_repetition_required and self.ignore_order and not self.report_repetition: # type: ignore
+ raise ValueError(DELTA_IGNORE_ORDER_NEEDS_REPETITION_REPORT)
+ if directed:
+ for report_key, report_value in result.items():
+ if isinstance(report_value, Mapping):
+ for path, value in report_value.items():
+ if isinstance(value, Mapping) and 'old_value' in value:
+ del value['old_value'] # type: ignore
+ if self._numpy_paths: # type: ignore
+ # Note that keys that start with '_' are considered internal to DeepDiff
+ # and will be omitted when counting distance. (Look inside the distance module.)
+ result['_numpy_paths'] = self._numpy_paths # type: ignore
+
+ if self.iterable_compare_func: # type: ignore
+ result['_iterable_compare_func_was_used'] = True
+
+ return deepcopy(dict(result))
+
+ def pretty(self, prefix: Optional[Union[str, Callable]]=None):
+ """
+ The pretty human readable string output for the diff object
+ regardless of what view was used to generate the diff.
+
+ prefix can be a callable or a string or None.
+
+ Example:
+ >>> t1={1,2,4}
+ >>> t2={2,3}
+ >>> print(DeepDiff(t1, t2).pretty())
+ Item root[3] added to set.
+ Item root[4] removed from set.
+ Item root[1] removed from set.
+ """
+ result = []
+ if prefix is None:
+ prefix = ''
+ keys = sorted(self.tree.keys()) # type: ignore # sorting keys to guarantee constant order across python versions.
+ for key in keys:
+ for item_key in self.tree[key]: # type: ignore
+ result += [pretty_print_diff(item_key)]
+
+ if callable(prefix):
+ return "\n".join(f"{prefix(diff=self)}{r}" for r in result)
+ return "\n".join(f"{prefix}{r}" for r in result)
+
+
+class _RestrictedUnpickler(pickle.Unpickler):
+
+ def __init__(self, *args, **kwargs):
+ self.safe_to_import = kwargs.pop('safe_to_import', None)
+ if self.safe_to_import:
+ if isinstance(self.safe_to_import, strings):
+ self.safe_to_import = set([self.safe_to_import])
+ elif isinstance(self.safe_to_import, (set, frozenset)):
+ pass
+ else:
+ self.safe_to_import = set(self.safe_to_import)
+ self.safe_to_import = self.safe_to_import | SAFE_TO_IMPORT
+ else:
+ self.safe_to_import = SAFE_TO_IMPORT
+ super().__init__(*args, **kwargs)
+
+ def find_class(self, module, name):
+ # Only allow safe classes from self.safe_to_import.
+ module_dot_class = '{}.{}'.format(module, name)
+ if module_dot_class in self.safe_to_import:
+ try:
+ module_obj = sys.modules[module]
+ except KeyError:
+ raise ModuleNotFoundError(MODULE_NOT_FOUND_MSG.format(module_dot_class)) from None
+ return getattr(module_obj, name)
+ # Forbid everything else.
+ raise ForbiddenModule(FORBIDDEN_MODULE_MSG.format(module_dot_class)) from None
+
+ def persistent_load(self, pid):
+ if pid == "<<NoneType>>":
+ return type(None)
+
+
+class _RestrictedPickler(pickle.Pickler):
+ def persistent_id(self, obj):
+ if obj is NONE_TYPE: # NOQA
+ return "<<NoneType>>"
+ return None
+
+
+def pickle_dump(obj, file_obj=None, protocol=4):
+ """
+ **pickle_dump**
+ Dumps the obj into pickled content.
+
+ **Parameters**
+
+ obj : Any python object
+
+ file_obj : (Optional) A file object to dump the contents into
+
+ **Returns**
+
+ If file_obj is passed the return value will be None. It will write the object's pickle contents into the file.
+ However if no file_obj is passed, then it will return the pickle serialization of the obj in the form of bytes.
+ """
+ file_obj_passed = bool(file_obj)
+ file_obj = file_obj or io.BytesIO()
+ _RestrictedPickler(file_obj, protocol=protocol, fix_imports=False).dump(obj)
+ if not file_obj_passed:
+ return file_obj.getvalue()
+
+
+def pickle_load(content=None, file_obj=None, safe_to_import=None):
+ """
+ **pickle_load**
+ Load the pickled content. content should be a bytes object.
+
+ **Parameters**
+
+ content : Bytes of pickled object.
+
+ file_obj : A file object to load the content from
+
+ safe_to_import : A set of modules that needs to be explicitly allowed to be loaded.
+ Example: {'mymodule.MyClass', 'decimal.Decimal'}
+ Note that this set will be added to the basic set of modules that are already allowed.
+ The set of what is already allowed can be found in deepdiff.serialization.SAFE_TO_IMPORT
+
+ **Returns**
+
+ A delta object that can be added to t1 to recreate t2.
+
+ **Examples**
+
+ Importing
+ >>> from deepdiff import DeepDiff, Delta
+ >>> from pprint import pprint
+
+
+ """
+ if not content and not file_obj:
+ raise ValueError('Please either pass the content or the file_obj to pickle_load.')
+ if isinstance(content, str):
+ content = content.encode('utf-8')
+ if content:
+ file_obj = io.BytesIO(content)
+ return _RestrictedUnpickler(file_obj, safe_to_import=safe_to_import).load()
+
+
+def _get_pretty_form_text(verbose_level):
+ pretty_form_texts = {
+ "type_changes": "Type of {diff_path} changed from {type_t1} to {type_t2} and value changed from {val_t1} to {val_t2}.",
+ "values_changed": "Value of {diff_path} changed from {val_t1} to {val_t2}.",
+ "dictionary_item_added": "Item {diff_path} added to dictionary.",
+ "dictionary_item_removed": "Item {diff_path} removed from dictionary.",
+ "iterable_item_added": "Item {diff_path} added to iterable.",
+ "iterable_item_removed": "Item {diff_path} removed from iterable.",
+ "attribute_added": "Attribute {diff_path} added.",
+ "attribute_removed": "Attribute {diff_path} removed.",
+ "set_item_added": "Item root[{val_t2}] added to set.",
+ "set_item_removed": "Item root[{val_t1}] removed from set.",
+ "repetition_change": "Repetition change for item {diff_path}.",
+ }
+ if verbose_level == 2:
+ pretty_form_texts.update(
+ {
+ "dictionary_item_added": "Item {diff_path} ({val_t2}) added to dictionary.",
+ "dictionary_item_removed": "Item {diff_path} ({val_t1}) removed from dictionary.",
+ "iterable_item_added": "Item {diff_path} ({val_t2}) added to iterable.",
+ "iterable_item_removed": "Item {diff_path} ({val_t1}) removed from iterable.",
+ "attribute_added": "Attribute {diff_path} ({val_t2}) added.",
+ "attribute_removed": "Attribute {diff_path} ({val_t1}) removed.",
+ }
+ )
+ return pretty_form_texts
+
+
+def pretty_print_diff(diff):
+ type_t1 = get_type(diff.t1).__name__
+ type_t2 = get_type(diff.t2).__name__
+
+ val_t1 = '"{}"'.format(str(diff.t1)) if type_t1 == "str" else str(diff.t1)
+ val_t2 = '"{}"'.format(str(diff.t2)) if type_t2 == "str" else str(diff.t2)
+
+ diff_path = diff.path(root='root')
+ return _get_pretty_form_text(diff.verbose_level).get(diff.report_type, "").format(
+ diff_path=diff_path,
+ type_t1=type_t1,
+ type_t2=type_t2,
+ val_t1=val_t1,
+ val_t2=val_t2)
+
+
+def load_path_content(path, file_type=None):
+ """
+ Loads and deserializes the content of the path.
+ """
+
+ if file_type is None:
+ file_type = path.split('.')[-1]
+ if file_type == 'json':
+ with open(path, 'r') as the_file:
+ content = json_loads(the_file.read())
+ elif file_type in {'yaml', 'yml'}:
+ try:
+ import yaml
+ except ImportError: # pragma: no cover.
+ raise ImportError('Pyyaml needs to be installed.') from None # pragma: no cover.
+ with open(path, 'r') as the_file:
+ content = yaml.safe_load(the_file)
+ elif file_type == 'toml':
+ try:
+ if sys.version_info >= (3, 11):
+ import tomllib as tomli
+ else:
+ import tomli
+ except ImportError: # pragma: no cover.
+ raise ImportError('On python<=3.10 tomli needs to be installed.') from None # pragma: no cover.
+ with open(path, 'rb') as the_file:
+ content = tomli.load(the_file)
+ elif file_type == 'pickle':
+ with open(path, 'rb') as the_file:
+ content = the_file.read()
+ content = pickle_load(content)
+ elif file_type in {'csv', 'tsv'}:
+ try:
+ import clevercsv # type: ignore
+ content = clevercsv.read_dicts(path)
+ except ImportError: # pragma: no cover.
+ import csv
+ with open(path, 'r') as the_file:
+ content = list(csv.DictReader(the_file))
+
+ logger.info(f"NOTE: CSV content was empty in {path}")
+
+ # Everything in csv is string but we try to automatically convert any numbers we find
+ for row in content:
+ for key, value in row.items():
+ value = value.strip()
+ for type_ in [int, float, complex]:
+ try:
+ value = type_(value)
+ except Exception:
+ pass
+ else:
+ row[key] = value
+ break
+ else:
+ raise UnsupportedFormatErr(f'Only json, yaml, toml, csv, tsv and pickle are supported.\n'
+ f' The {file_type} extension is not known.')
+ return content
+
+
+def save_content_to_path(content, path, file_type=None, keep_backup=True):
+ """
+ Saves and serializes the content of the path.
+ """
+
+ backup_path = f"{path}.bak"
+ os.rename(path, backup_path)
+
+ try:
+ _save_content(
+ content=content, path=path,
+ file_type=file_type, keep_backup=keep_backup)
+ except Exception:
+ os.rename(backup_path, path)
+ raise
+ else:
+ if not keep_backup:
+ os.remove(backup_path)
+
+
+def _save_content(content, path, file_type, keep_backup=True):
+ if file_type == 'json':
+ with open(path, 'w') as the_file:
+ content = json_dumps(content)
+ the_file.write(content) # type: ignore
+ elif file_type in {'yaml', 'yml'}:
+ try:
+ import yaml
+ except ImportError: # pragma: no cover.
+ raise ImportError('Pyyaml needs to be installed.') from None # pragma: no cover.
+ with open(path, 'w') as the_file:
+ content = yaml.safe_dump(content, stream=the_file)
+ elif file_type == 'toml':
+ try:
+ import tomli_w
+ except ImportError: # pragma: no cover.
+ raise ImportError('Tomli-w needs to be installed.') from None # pragma: no cover.
+ with open(path, 'wb') as the_file:
+ content = tomli_w.dump(content, the_file)
+ elif file_type == 'pickle':
+ with open(path, 'wb') as the_file:
+ content = pickle_dump(content, file_obj=the_file)
+ elif file_type in {'csv', 'tsv'}:
+ try:
+ import clevercsv # type: ignore
+ dict_writer = clevercsv.DictWriter
+ except ImportError: # pragma: no cover.
+ import csv
+ dict_writer = csv.DictWriter
+ with open(path, 'w', newline='') as csvfile:
+ fieldnames = list(content[0].keys())
+ writer = dict_writer(csvfile, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(content)
+ else:
+ raise UnsupportedFormatErr('Only json, yaml, toml, csv, tsv and pickle are supported.\n'
+ f' The {file_type} extension is not known.')
+ return content
+
+
+def _serialize_decimal(value):
+ if value.as_tuple().exponent == 0:
+ return int(value)
+ else:
+ return float(value)
+
+
+def _serialize_tuple(value):
+ if hasattr(value, '_asdict'): # namedtuple
+ return value._asdict()
+ return value
+
+
+JSON_CONVERTOR = {
+ decimal.Decimal: _serialize_decimal,
+ SetOrdered: list,
+ orderly_set.StableSetEq: list,
+ set: list,
+ type: lambda x: x.__name__,
+ bytes: lambda x: x.decode('utf-8'),
+ datetime.datetime: lambda x: x.isoformat(),
+ uuid.UUID: lambda x: str(x),
+ np_float32: float,
+ np_float64: float,
+ np_int32: int,
+ np_int64: int,
+ np_ndarray: lambda x: x.tolist(),
+ tuple: _serialize_tuple,
+ Mapping: dict,
+ NotPresent: str,
+}
+
+if PydanticBaseModel is not pydantic_base_model_type:
+ JSON_CONVERTOR[PydanticBaseModel] = lambda x: x.dict()
+
+
+def json_convertor_default(default_mapping=None):
+ if default_mapping:
+ _convertor_mapping = JSON_CONVERTOR.copy()
+ _convertor_mapping.update(default_mapping)
+ else:
+ _convertor_mapping = JSON_CONVERTOR
+
+ def _convertor(obj):
+ for original_type, convert_to in _convertor_mapping.items():
+ if isinstance(obj, original_type):
+ return convert_to(obj)
+ # This is to handle reverse() which creates a generator of type list_reverseiterator
+ if obj.__class__.__name__ == 'list_reverseiterator':
+ return list(copy(obj))
+ raise TypeError('We do not know how to convert {} of type {} for json serialization. Please pass the default_mapping parameter with proper mapping of the object to a basic python type.'.format(obj, type(obj)))
+
+ return _convertor
+
+
+class JSONDecoder(json.JSONDecoder):
+
+ def __init__(self, *args, **kwargs):
+ json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)
+
+ def object_hook(self, obj): # type: ignore
+ if 'old_type' in obj and 'new_type' in obj:
+ for type_key in ('old_type', 'new_type'):
+ type_str = obj[type_key]
+ obj[type_key] = TYPE_STR_TO_TYPE.get(type_str, type_str)
+
+ return obj
+
+
+
+@overload
+def json_dumps(
+ item: Any,
+ **kwargs,
+) -> str:
+ ...
+
+
+@overload
+def json_dumps(
+ item: Any,
+ default_mapping:Optional[dict],
+ force_use_builtin_json: bool,
+ return_bytes:Literal[True],
+ **kwargs,
+) -> bytes:
+ ...
+
+
+@overload
+def json_dumps(
+ item: Any,
+ default_mapping:Optional[dict],
+ force_use_builtin_json: bool,
+ return_bytes:Literal[False],
+ **kwargs,
+) -> str:
+ ...
+
+
+def json_dumps(
+ item: Any,
+ default_mapping:Optional[dict]=None,
+ force_use_builtin_json: bool = False,
+ return_bytes: bool = False,
+ **kwargs,
+) -> Union[str, bytes]:
+ """
+ Dump json with extra details that are not normally json serializable
+
+ parameters
+ ----------
+
+ force_use_builtin_json: Boolean, default = False
+ When True, we use Python's builtin Json library for serialization,
+ even if Orjson is installed.
+ """
+ if orjson and not force_use_builtin_json:
+ indent = kwargs.pop('indent', None)
+ kwargs['option'] = orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
+ if indent:
+ kwargs['option'] |= orjson.OPT_INDENT_2
+ if 'sort_keys' in kwargs:
+ raise TypeError(
+ "orjson does not accept the sort_keys parameter. "
+ "If you need to pass sort_keys, set force_use_builtin_json=True "
+ "to use Python's built-in json library instead of orjson.")
+ result = orjson.dumps(
+ item,
+ default=json_convertor_default(default_mapping=default_mapping),
+ **kwargs)
+ if return_bytes:
+ return result
+ return result.decode(encoding='utf-8')
+ else:
+ result = json.dumps(
+ item,
+ default=json_convertor_default(default_mapping=default_mapping),
+ **kwargs)
+ if return_bytes:
+ return result.encode(encoding='utf-8')
+ return result
+
+
+json_loads = partial(json.loads, cls=JSONDecoder)
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/summarize.py b/.venv/lib/python3.12/site-packages/deepdiff/summarize.py
new file mode 100644
index 00000000..f911b84c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/summarize.py
@@ -0,0 +1,144 @@
+from typing import Tuple
+from deepdiff.helper import JSON, SummaryNodeType
+from deepdiff.serialization import json_dumps
+
+
+def _truncate(s: str, max_len: int) -> str:
+ """
+ Truncate string s to max_len characters.
+ If possible, keep the first (max_len-5) characters, then '...' then the last 2 characters.
+ """
+ if len(s) <= max_len:
+ return s
+ if max_len <= 5:
+ return s[:max_len]
+ return s[:max_len - 5] + "..." + s[-2:]
+# Re-defining the functions due to environment reset
+
+
+# Function to calculate node weights recursively
+def calculate_weights(node):
+ if isinstance(node, dict):
+ weight = 0
+ children_weights = {}
+ for k, v in node.items():
+ try:
+ edge_weight = len(k)
+ except TypeError:
+ edge_weight = 1
+ child_weight, child_structure = calculate_weights(v)
+ total_weight = edge_weight + child_weight
+ weight += total_weight
+ children_weights[k] = (edge_weight, child_weight, child_structure)
+ return weight, (SummaryNodeType.dict, children_weights)
+
+ elif isinstance(node, list):
+ weight = 0
+ children_weights = []
+ for v in node:
+ edge_weight = 0 # Index weights are zero
+ child_weight, child_structure = calculate_weights(v)
+ total_weight = edge_weight + child_weight
+ weight += total_weight
+ children_weights.append((edge_weight, child_weight, child_structure))
+ return weight, (SummaryNodeType.list, children_weights)
+
+ else:
+ if isinstance(node, str):
+ node_weight = len(node)
+ elif isinstance(node, int):
+ node_weight = len(str(node))
+ elif isinstance(node, float):
+ node_weight = len(str(round(node, 2)))
+ elif node is None:
+ node_weight = 1
+ else:
+ node_weight = 0
+ return node_weight, (SummaryNodeType.leaf, node)
+
+# Include previously defined functions for shrinking with threshold
+# (Implementing directly the balanced summarization algorithm as above)
+
+# Balanced algorithm (simplified version):
+def shrink_tree_balanced(node_structure, max_weight: int, balance_threshold: float) -> Tuple[JSON, float]:
+ node_type, node_info = node_structure
+
+ if node_type is SummaryNodeType.leaf:
+ leaf_value = node_info
+ leaf_weight, _ = calculate_weights(leaf_value)
+ if leaf_weight <= max_weight:
+ return leaf_value, leaf_weight
+ else:
+ if isinstance(leaf_value, str):
+ truncated_value = _truncate(leaf_value, max_weight)
+ return truncated_value, len(truncated_value)
+ elif isinstance(leaf_value, (int, float)):
+ leaf_str = str(leaf_value)
+ truncated_str = leaf_str[:max_weight]
+ try:
+ return int(truncated_str), len(truncated_str)
+ except Exception:
+ try:
+ return float(truncated_str), len(truncated_str)
+ except Exception:
+ return truncated_str, len(truncated_str)
+ elif leaf_value is None:
+ return None, 1 if max_weight >= 1 else 0
+
+ elif node_type is SummaryNodeType.dict:
+ shrunk_dict = {}
+ total_weight = 0
+ sorted_children = sorted(node_info.items(), key=lambda x: x[1][0] + x[1][1], reverse=True)
+
+ for k, (edge_w, _, child_struct) in sorted_children:
+ allowed_branch_weight = min(max_weight * balance_threshold, max_weight - total_weight)
+ if allowed_branch_weight <= edge_w:
+ continue
+
+ remaining_weight = int(allowed_branch_weight - edge_w)
+ shrunk_child, shrunk_weight = shrink_tree_balanced(child_struct, remaining_weight, balance_threshold)
+ if shrunk_child is not None:
+ shrunk_dict[k[:edge_w]] = shrunk_child
+ total_weight += edge_w + shrunk_weight
+
+ if total_weight >= max_weight:
+ break
+ if not shrunk_dict:
+ return None, 0
+
+ return shrunk_dict, total_weight
+
+ elif node_type is SummaryNodeType.list:
+ shrunk_list = []
+ total_weight = 0
+ sorted_children = sorted(node_info, key=lambda x: x[0] + x[1], reverse=True)
+ for edge_w, _, child_struct in sorted_children:
+ allowed_branch_weight = int(min(max_weight * balance_threshold, max_weight - total_weight))
+ shrunk_child, shrunk_weight = shrink_tree_balanced(child_struct, allowed_branch_weight, balance_threshold)
+ if shrunk_child is not None:
+ shrunk_list.append(shrunk_child)
+ total_weight += shrunk_weight
+ if total_weight >= max_weight - 1:
+ shrunk_list.append("...")
+ break
+ if not shrunk_list:
+ return None, 0
+ return shrunk_list, total_weight
+ return None, 0
+
+
+def greedy_tree_summarization_balanced(json_data: JSON, max_weight: int, balance_threshold=0.6) -> JSON:
+ total_weight, tree_structure = calculate_weights(json_data)
+ if total_weight <= max_weight:
+ return json_data
+ shrunk_tree, _ = shrink_tree_balanced(tree_structure, max_weight, balance_threshold)
+ return shrunk_tree
+
+
+def summarize(data: JSON, max_length:int=200, balance_threshold:float=0.6) -> str:
+ try:
+ return json_dumps(
+ greedy_tree_summarization_balanced(data, max_length, balance_threshold)
+ )
+ except Exception:
+ return str(data)