diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/deepdiff/distance.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff/distance.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/deepdiff/distance.py | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/distance.py b/.venv/lib/python3.12/site-packages/deepdiff/distance.py new file mode 100644 index 00000000..adaf5045 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/deepdiff/distance.py @@ -0,0 +1,342 @@ +import math +import datetime +from typing import TYPE_CHECKING, Callable, Protocol, Any +from deepdiff.deephash import DeepHash +from deepdiff.helper import ( + DELTA_VIEW, numbers, strings, add_to_frozen_set, not_found, only_numbers, np, np_float64, time_to_seconds, + cartesian_product_numpy, np_ndarray, np_array_factory, get_homogeneous_numpy_compatible_type_of_seq, dict_, + CannotCompare) +from collections.abc import Mapping, Iterable + +if TYPE_CHECKING: + from deepdiff.diff import DeepDiffProtocol + + class DistanceProtocol(DeepDiffProtocol, Protocol): + hashes: dict + deephash_parameters: dict + iterable_compare_func: Callable | None + math_epsilon: float + cutoff_distance_for_pairs: float + + def __get_item_rough_length(self, item, parent:str="root") -> float: + ... + + def _to_delta_dict( + self, + directed: bool = True, + report_repetition_required: bool = True, + always_include_values: bool = False, + ) -> dict: + ... + + def __calculate_item_deephash(self, item: Any) -> None: + ... + + + +DISTANCE_CALCS_NEEDS_CACHE = "Distance calculation can not happen once the cache is purged. Try with _cache='keep'" + + +class DistanceMixin: + + def _get_rough_distance(self: "DistanceProtocol"): + """ + Gives a numeric value for the distance of t1 and t2 based on how many operations are needed to convert + one to the other. + + This is a similar concept to the Levenshtein Edit Distance but for the structured data and is it is designed + to be between 0 and 1. + + A distance of zero means the objects are equal and a distance of 1 is very far. + + Note: The distance calculation formula is subject to change in future. Use the distance results only as a + way of comparing the distances of pairs of items with other pairs rather than an absolute distance + such as the one provided by Levenshtein edit distance. + + Info: The current algorithm is based on the number of operations that are needed to convert t1 to t2 divided + by the number of items that make up t1 and t2. + """ + + _distance = get_numeric_types_distance( + self.t1, self.t2, max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold) + + if _distance is not not_found: + return _distance + + item = self if self.view == DELTA_VIEW else self._to_delta_dict(report_repetition_required=False) + diff_length = _get_item_length(item) + + if diff_length == 0: + return 0 + + t1_len = self.__get_item_rough_length(self.t1) + t2_len = self.__get_item_rough_length(self.t2) + + return diff_length / (t1_len + t2_len) + + def __get_item_rough_length(self: "DistanceProtocol", item, parent='root'): + """ + Get the rough length of an item. + It is used as a part of calculating the rough distance between objects. + + **parameters** + + item: The item to calculate the rough length for + parent: It is only used for DeepHash reporting purposes. Not really useful here. + """ + if not hasattr(self, 'hashes'): + raise RuntimeError(DISTANCE_CALCS_NEEDS_CACHE) + length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) + if length is None: + self.__calculate_item_deephash(item) + length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1) + return length + + def __calculate_item_deephash(self: "DistanceProtocol", item: Any) -> None: + DeepHash( + item, + hashes=self.hashes, + parent='root', + apply_hash=True, + **self.deephash_parameters, + ) + + def _precalculate_distance_by_custom_compare_func( + self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): + pre_calced_distances = dict_() + for added_hash in hashes_added: + for removed_hash in hashes_removed: + try: + is_close_distance = self.iterable_compare_func(t2_hashtable[added_hash].item, t1_hashtable[removed_hash].item) + except CannotCompare: + pass + else: + if is_close_distance: + # an arbitrary small distance if math_epsilon is not defined + distance = self.math_epsilon or 0.000001 + else: + distance = 1 + pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distance + + return pre_calced_distances + + def _precalculate_numpy_arrays_distance( + self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type): + + # We only want to deal with 1D arrays. + if isinstance(t2_hashtable[next(iter(hashes_added))].item, (np_ndarray, list)): + return + + pre_calced_distances = dict_() + added = [t2_hashtable[k].item for k in hashes_added] + removed = [t1_hashtable[k].item for k in hashes_removed] + + if _original_type is None: + added_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(added) + removed_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(removed) + if added_numpy_compatible_type and added_numpy_compatible_type == removed_numpy_compatible_type: + _original_type = added_numpy_compatible_type + if _original_type is None: + return + + added = np_array_factory(added, dtype=_original_type) + removed = np_array_factory(removed, dtype=_original_type) + + pairs = cartesian_product_numpy(added, removed) + + pairs_transposed = pairs.T + + distances = _get_numpy_array_distance( + pairs_transposed[0], pairs_transposed[1], + max_=self.cutoff_distance_for_pairs, + use_log_scale=self.use_log_scale, + log_scale_similarity_threshold=self.log_scale_similarity_threshold, + ) + + i = 0 + for added_hash in hashes_added: + for removed_hash in hashes_removed: + pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distances[i] + i += 1 + return pre_calced_distances + + +def _get_item_length(item, parents_ids=frozenset([])): + """ + Get the number of operations in a diff object. + It is designed mainly for the delta view output + but can be used with other dictionary types of view outputs too. + """ + length = 0 + if isinstance(item, Mapping): + for key, subitem in item.items(): + # dedupe the repetition report so the number of times items have shown up does not affect the distance. + if key in {'iterable_items_added_at_indexes', 'iterable_items_removed_at_indexes'}: + new_subitem = dict_() + for path_, indexes_to_items in subitem.items(): + used_value_ids = set() + new_indexes_to_items = dict_() + for k, v in indexes_to_items.items(): + v_id = id(v) + if v_id not in used_value_ids: + used_value_ids.add(v_id) + new_indexes_to_items[k] = v + new_subitem[path_] = new_indexes_to_items + subitem = new_subitem + + # internal keys such as _numpy_paths should not count towards the distance + if isinstance(key, strings) and (key.startswith('_') or key == 'deep_distance' or key == 'new_path'): + continue + + item_id = id(subitem) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + elif isinstance(item, numbers): + length = 1 + elif isinstance(item, strings): + length = 1 + elif isinstance(item, Iterable): + for subitem in item: + item_id = id(subitem) + if parents_ids and item_id in parents_ids: + continue + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + elif isinstance(item, type): # it is a class + length = 1 + else: + if hasattr(item, '__dict__'): + for subitem in item.__dict__: + item_id = id(subitem) + parents_ids_added = add_to_frozen_set(parents_ids, item_id) + length += _get_item_length(subitem, parents_ids_added) + return length + + +def _get_numbers_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): + """ + Get the distance of 2 numbers. The output is a number between 0 to the max. + The reason is the + When max is returned means the 2 numbers are really far, and 0 means they are equal. + """ + if num1 == num2: + return 0 + if use_log_scale: + distance = logarithmic_distance(num1, num2) + if distance < 0: + return 0 + return distance + if not isinstance(num1, float): + num1 = float(num1) + if not isinstance(num2, float): + num2 = float(num2) + # Since we have a default cutoff of 0.3 distance when + # getting the pairs of items during the ingore_order=True + # calculations, we need to make the divisor of comparison very big + # so that any 2 numbers can be chosen as pairs. + divisor = (num1 + num2) / max_ + if divisor == 0: + return max_ + try: + return min(max_, abs((num1 - num2) / divisor)) + except Exception: # pragma: no cover. I don't think this line will ever run but doesn't hurt to leave it. + return max_ # pragma: no cover + + +def _numpy_div(a, b, replace_inf_with=1): + max_array = np.full(shape=a.shape, fill_value=replace_inf_with, dtype=np_float64) + result = np.divide(a, b, out=max_array, where=b != 0, dtype=np_float64) + # wherever 2 numbers are the same, make sure the distance is zero. This is mainly for 0 divided by zero. + result[a == b] = 0 + return result + +# To deal with numbers close to zero +MATH_LOG_OFFSET = 1e-10 + +def numpy_apply_log_keep_sign(array, offset=MATH_LOG_OFFSET): + # Calculate the absolute value and add the offset + abs_plus_offset = np.abs(array) + offset + + # Calculate the logarithm + log_values = np.log(abs_plus_offset) + + # Apply the original signs to the log values + signed_log_values = np.copysign(log_values, array) + + return signed_log_values + + +def logarithmic_similarity(a: numbers, b: numbers, threshold: float=0.1) -> float: + """ + A threshold of 0.1 translates to about 10.5% difference. + A threshold of 0.5 translates to about 65% difference. + A threshold of 0.05 translates to about 5.1% difference. + """ + return logarithmic_distance(a, b) < threshold + + +def logarithmic_distance(a: numbers, b: numbers) -> float: + # Apply logarithm to the absolute values and consider the sign + a = float(a) + b = float(b) + log_a = math.copysign(math.log(abs(a) + MATH_LOG_OFFSET), a) + log_b = math.copysign(math.log(abs(b) + MATH_LOG_OFFSET), b) + + return abs(log_a - log_b) + + +def _get_numpy_array_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1): + """ + Get the distance of 2 numbers. The output is a number between 0 to the max. + The reason is the + When max is returned means the 2 numbers are really far, and 0 means they are equal. + """ + # Since we have a default cutoff of 0.3 distance when + # getting the pairs of items during the ingore_order=True + # calculations, we need to make the divisor of comparison very big + # so that any 2 numbers can be chosen as pairs. + if use_log_scale: + num1 = numpy_apply_log_keep_sign(num1) + num2 = numpy_apply_log_keep_sign(num2) + + divisor = (num1 + num2) / max_ + result = _numpy_div((num1 - num2), divisor, replace_inf_with=max_) + + distance_array = np.clip(np.absolute(result), 0, max_) + if use_log_scale: + distance_array[distance_array < log_scale_similarity_threshold] = 0 + return distance_array + + +def _get_datetime_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(date1.timestamp(), date2.timestamp(), max_) + + +def _get_date_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(date1.toordinal(), date2.toordinal(), max_) + + +def _get_timedelta_distance(timedelta1, timedelta2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(timedelta1.total_seconds(), timedelta2.total_seconds(), max_) + + +def _get_time_distance(time1, time2, max_, use_log_scale, log_scale_similarity_threshold): + return _get_numbers_distance(time_to_seconds(time1), time_to_seconds(time2), max_) + + +TYPES_TO_DIST_FUNC = [ + (only_numbers, _get_numbers_distance), + (datetime.datetime, _get_datetime_distance), + (datetime.date, _get_date_distance), + (datetime.timedelta, _get_timedelta_distance), + (datetime.time, _get_time_distance), +] + + +def get_numeric_types_distance(num1, num2, max_, use_log_scale=False, log_scale_similarity_threshold=0.1): + for type_, func in TYPES_TO_DIST_FUNC: + if isinstance(num1, type_) and isinstance(num2, type_): + return func(num1, num2, max_, use_log_scale, log_scale_similarity_threshold) + return not_found |