import math import datetime from typing import TYPE_CHECKING, Callable, Protocol, Any from deepdiff.deephash import DeepHash from deepdiff.helper import ( DELTA_VIEW, numbers, strings, add_to_frozen_set, not_found, only_numbers, np, np_float64, time_to_seconds, cartesian_product_numpy, np_ndarray, np_array_factory, get_homogeneous_numpy_compatible_type_of_seq, dict_, CannotCompare) from collections.abc import Mapping, Iterable if TYPE_CHECKING: from deepdiff.diff import DeepDiffProtocol class DistanceProtocol(DeepDiffProtocol, Protocol): hashes: dict deephash_parameters: dict iterable_compare_func: Callable | None math_epsilon: float cutoff_distance_for_pairs: float def __get_item_rough_length(self, item, parent:str="root") -> float: ... def _to_delta_dict( self, directed: bool = True, report_repetition_required: bool = True, always_include_values: bool = False, ) -> dict: ... def __calculate_item_deephash(self, item: Any) -> None: ... DISTANCE_CALCS_NEEDS_CACHE = "Distance calculation can not happen once the cache is purged. Try with _cache='keep'" class DistanceMixin: def _get_rough_distance(self: "DistanceProtocol"): """ Gives a numeric value for the distance of t1 and t2 based on how many operations are needed to convert one to the other. This is a similar concept to the Levenshtein Edit Distance but for the structured data and is it is designed to be between 0 and 1. A distance of zero means the objects are equal and a distance of 1 is very far. Note: The distance calculation formula is subject to change in future. Use the distance results only as a way of comparing the distances of pairs of items with other pairs rather than an absolute distance such as the one provided by Levenshtein edit distance. Info: The current algorithm is based on the number of operations that are needed to convert t1 to t2 divided by the number of items that make up t1 and t2. """ _distance = get_numeric_types_distance( self.t1, self.t2, max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold) if _distance is not not_found: return _distance item = self if self.view == DELTA_VIEW else self._to_delta_dict(report_repetition_required=False) diff_length = _get_item_length(item) if diff_length == 0: return 0 t1_len = self.__get_item_rough_length(self.t1) t2_len = self.__get_item_rough_length(self.t2) return diff_length / (t1_len + t2_len) def __get_item_rough_length(self: "DistanceProtocol", item, parent='root'): """ Get the rough length of an item. It is used as a part of calculating the rough distance between objects. **parameters** item: The item to calculate the rough length for parent: It is only used for DeepHash reporting purposes. Not really useful here. """ if not hasattr(self, 'hashes'): raise RuntimeError(DISTANCE_CALCS_NEEDS_CACHE) length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) if length is None: self.__calculate_item_deephash(item) length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) return length def __calculate_item_deephash(self: "DistanceProtocol", item: Any) -> None: DeepHash( item, hashes=self.hashes, parent='root', apply_hash=True, **self.deephash_parameters, ) def _precalculate_distance_by_custom_compare_func( self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): pre_calced_distances = dict_() for added_hash in hashes_added: for removed_hash in hashes_removed: try: is_close_distance = self.iterable_compare_func(t2_hashtable[added_hash].item, t1_hashtable[removed_hash].item) except CannotCompare: pass else: if is_close_distance: # an arbitrary small distance if math_epsilon is not defined distance = self.math_epsilon or 0.000001 else: distance = 1 pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distance return pre_calced_distances def _precalculate_numpy_arrays_distance( self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): # We only want to deal with 1D arrays. if isinstance(t2_hashtable[next(iter(hashes_added))].item, (np_ndarray, list)): return pre_calced_distances = dict_() added = [t2_hashtable[k].item for k in hashes_added] removed = [t1_hashtable[k].item for k in hashes_removed] if _original_type is None: added_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(added) removed_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(removed) if added_numpy_compatible_type and added_numpy_compatible_type == removed_numpy_compatible_type: _original_type = added_numpy_compatible_type if _original_type is None: return added = np_array_factory(added, dtype=_original_type) removed = np_array_factory(removed, dtype=_original_type) pairs = cartesian_product_numpy(added, removed) pairs_transposed = pairs.T distances = _get_numpy_array_distance( pairs_transposed[0], pairs_transposed[1], max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold, ) i = 0 for added_hash in hashes_added: for removed_hash in hashes_removed: pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distances[i] i += 1 return pre_calced_distances def _get_item_length(item, parents_ids=frozenset([])): """ Get the number of operations in a diff object. It is designed mainly for the delta view output but can be used with other dictionary types of view outputs too. """ length = 0 if isinstance(item, Mapping): for key, subitem in item.items(): # dedupe the repetition report so the number of times items have shown up does not affect the distance. if key in {'iterable_items_added_at_indexes', 'iterable_items_removed_at_indexes'}: new_subitem = dict_() for path_, indexes_to_items in subitem.items(): used_value_ids = set() new_indexes_to_items = dict_() for k, v in indexes_to_items.items(): v_id = id(v) if v_id not in used_value_ids: used_value_ids.add(v_id) new_indexes_to_items[k] = v new_subitem[path_] = new_indexes_to_items subitem = new_subitem # internal keys such as _numpy_paths should not count towards the distance if isinstance(key, strings) and (key.startswith('_') or key == 'deep_distance' or key == 'new_path'): continue item_id = id(subitem) if parents_ids and item_id in parents_ids: continue parents_ids_added = add_to_frozen_set(parents_ids, item_id) length += _get_item_length(subitem, parents_ids_added) elif isinstance(item, numbers): length = 1 elif isinstance(item, strings): length = 1 elif isinstance(item, Iterable): for subitem in item: item_id = id(subitem) if parents_ids and item_id in parents_ids: continue parents_ids_added = add_to_frozen_set(parents_ids, item_id) length += _get_item_length(subitem, parents_ids_added) elif isinstance(item, type): # it is a class length = 1 else: if hasattr(item, '__dict__'): for subitem in item.__dict__: item_id = id(subitem) parents_ids_added = add_to_frozen_set(parents_ids, item_id) length += _get_item_length(subitem, parents_ids_added) return length def _get_numbers_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): """ Get the distance of 2 numbers. The output is a number between 0 to the max. The reason is the When max is returned means the 2 numbers are really far, and 0 means they are equal. """ if num1 == num2: return 0 if use_log_scale: distance = logarithmic_distance(num1, num2) if distance < 0: return 0 return distance if not isinstance(num1, float): num1 = float(num1) if not isinstance(num2, float): num2 = float(num2) # Since we have a default cutoff of 0.3 distance when # getting the pairs of items during the ingore_order=True # calculations, we need to make the divisor of comparison very big # so that any 2 numbers can be chosen as pairs. divisor = (num1 + num2) / max_ if divisor == 0: return max_ try: return min(max_, abs((num1 - num2) / divisor)) except Exception: # pragma: no cover. I don't think this line will ever run but doesn't hurt to leave it. return max_ # pragma: no cover def _numpy_div(a, b, replace_inf_with=1): max_array = np.full(shape=a.shape, fill_value=replace_inf_with, dtype=np_float64) result = np.divide(a, b, out=max_array, where=b != 0, dtype=np_float64) # wherever 2 numbers are the same, make sure the distance is zero. This is mainly for 0 divided by zero. result[a == b] = 0 return result # To deal with numbers close to zero MATH_LOG_OFFSET = 1e-10 def numpy_apply_log_keep_sign(array, offset=MATH_LOG_OFFSET): # Calculate the absolute value and add the offset abs_plus_offset = np.abs(array) + offset # Calculate the logarithm log_values = np.log(abs_plus_offset) # Apply the original signs to the log values signed_log_values = np.copysign(log_values, array) return signed_log_values def logarithmic_similarity(a: numbers, b: numbers, threshold: float=0.1) -> float: """ A threshold of 0.1 translates to about 10.5% difference. A threshold of 0.5 translates to about 65% difference. A threshold of 0.05 translates to about 5.1% difference. """ return logarithmic_distance(a, b) < threshold def logarithmic_distance(a: numbers, b: numbers) -> float: # Apply logarithm to the absolute values and consider the sign a = float(a) b = float(b) log_a = math.copysign(math.log(abs(a) + MATH_LOG_OFFSET), a) log_b = math.copysign(math.log(abs(b) + MATH_LOG_OFFSET), b) return abs(log_a - log_b) def _get_numpy_array_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): """ Get the distance of 2 numbers. The output is a number between 0 to the max. The reason is the When max is returned means the 2 numbers are really far, and 0 means they are equal. """ # Since we have a default cutoff of 0.3 distance when # getting the pairs of items during the ingore_order=True # calculations, we need to make the divisor of comparison very big # so that any 2 numbers can be chosen as pairs. if use_log_scale: num1 = numpy_apply_log_keep_sign(num1) num2 = numpy_apply_log_keep_sign(num2) divisor = (num1 + num2) / max_ result = _numpy_div((num1 - num2), divisor, replace_inf_with=max_) distance_array = np.clip(np.absolute(result), 0, max_) if use_log_scale: distance_array[distance_array < log_scale_similarity_threshold] = 0 return distance_array def _get_datetime_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): return _get_numbers_distance(date1.timestamp(), date2.timestamp(), max_) def _get_date_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): return _get_numbers_distance(date1.toordinal(), date2.toordinal(), max_) def _get_timedelta_distance(timedelta1, timedelta2, max_, use_log_scale, log_scale_similarity_threshold): return _get_numbers_distance(timedelta1.total_seconds(), timedelta2.total_seconds(), max_) def _get_time_distance(time1, time2, max_, use_log_scale, log_scale_similarity_threshold): return _get_numbers_distance(time_to_seconds(time1), time_to_seconds(time2), max_) TYPES_TO_DIST_FUNC = [ (only_numbers, _get_numbers_distance), (datetime.datetime, _get_datetime_distance), (datetime.date, _get_date_distance), (datetime.timedelta, _get_timedelta_distance), (datetime.time, _get_time_distance), ] def get_numeric_types_distance(num1, num2, max_, use_log_scale=False, log_scale_similarity_threshold=0.1): for type_, func in TYPES_TO_DIST_FUNC: if isinstance(num1, type_) and isinstance(num2, type_): return func(num1, num2, max_, use_log_scale, log_scale_similarity_threshold) return not_found