aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/deepdiff/distance.py
blob: adaf5045e5a759806656cff0e5d1923184663e88 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import math
import datetime
from typing import TYPE_CHECKING, Callable, Protocol, Any
from deepdiff.deephash import DeepHash
from deepdiff.helper import (
    DELTA_VIEW, numbers, strings, add_to_frozen_set, not_found, only_numbers, np, np_float64, time_to_seconds,
    cartesian_product_numpy, np_ndarray, np_array_factory, get_homogeneous_numpy_compatible_type_of_seq, dict_,
    CannotCompare)
from collections.abc import Mapping, Iterable

if TYPE_CHECKING:
    from deepdiff.diff import DeepDiffProtocol

    class DistanceProtocol(DeepDiffProtocol, Protocol):
        hashes: dict
        deephash_parameters: dict
        iterable_compare_func: Callable | None
        math_epsilon: float
        cutoff_distance_for_pairs: float

        def __get_item_rough_length(self, item, parent:str="root") -> float:
            ...

        def _to_delta_dict(
            self,
            directed: bool = True,
            report_repetition_required: bool = True,
            always_include_values: bool = False,
        ) -> dict:
            ...

        def __calculate_item_deephash(self, item: Any) -> None:
            ...



DISTANCE_CALCS_NEEDS_CACHE = "Distance calculation can not happen once the cache is purged. Try with _cache='keep'"


class DistanceMixin:

    def _get_rough_distance(self: "DistanceProtocol"):
        """
        Gives a numeric value for the distance of t1 and t2 based on how many operations are needed to convert
        one to the other.

        This is a similar concept to the Levenshtein Edit Distance but for the structured data and is it is designed
        to be between 0 and 1.

        A distance of zero means the objects are equal and a distance of 1 is very far.

        Note: The distance calculation formula is subject to change in future. Use the distance results only as a
        way of comparing the distances of pairs of items with other pairs rather than an absolute distance
        such as the one provided by Levenshtein edit distance.

        Info: The current algorithm is based on the number of operations that are needed to convert t1 to t2 divided
        by the number of items that make up t1 and t2.
        """

        _distance = get_numeric_types_distance(
            self.t1, self.t2, max_=self.cutoff_distance_for_pairs, use_log_scale=self.use_log_scale, log_scale_similarity_threshold=self.log_scale_similarity_threshold)

        if _distance is not not_found:
            return _distance

        item = self if self.view == DELTA_VIEW else self._to_delta_dict(report_repetition_required=False)
        diff_length = _get_item_length(item)

        if diff_length == 0:
            return 0

        t1_len = self.__get_item_rough_length(self.t1)
        t2_len = self.__get_item_rough_length(self.t2)

        return diff_length / (t1_len + t2_len)

    def __get_item_rough_length(self: "DistanceProtocol", item, parent='root'):
        """
        Get the rough length of an item.
        It is used as a part of calculating the rough distance between objects.

        **parameters**

        item: The item to calculate the rough length for
        parent: It is only used for DeepHash reporting purposes. Not really useful here.
        """
        if not hasattr(self, 'hashes'):
            raise RuntimeError(DISTANCE_CALCS_NEEDS_CACHE)
        length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1)
        if length is None:
            self.__calculate_item_deephash(item)
            length = DeepHash.get_key(self.hashes, key=item, default=None, extract_index=1)
        return length

    def __calculate_item_deephash(self: "DistanceProtocol", item: Any) -> None:
        DeepHash(
            item,
            hashes=self.hashes,
            parent='root',
            apply_hash=True,
            **self.deephash_parameters,
        )

    def _precalculate_distance_by_custom_compare_func(
            self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type):
        pre_calced_distances = dict_()
        for added_hash in hashes_added:
            for removed_hash in hashes_removed:
                try:
                    is_close_distance = self.iterable_compare_func(t2_hashtable[added_hash].item, t1_hashtable[removed_hash].item)
                except CannotCompare:
                    pass
                else:
                    if is_close_distance:
                        # an arbitrary small distance if math_epsilon is not defined
                        distance = self.math_epsilon or 0.000001
                    else:
                        distance = 1
                    pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distance

        return pre_calced_distances

    def _precalculate_numpy_arrays_distance(
            self: "DistanceProtocol", hashes_added, hashes_removed, t1_hashtable, t2_hashtable, _original_type):

        # We only want to deal with 1D arrays.
        if isinstance(t2_hashtable[next(iter(hashes_added))].item, (np_ndarray, list)):
            return

        pre_calced_distances = dict_()
        added = [t2_hashtable[k].item for k in hashes_added]
        removed = [t1_hashtable[k].item for k in hashes_removed]

        if _original_type is None:
            added_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(added)
            removed_numpy_compatible_type = get_homogeneous_numpy_compatible_type_of_seq(removed)
            if added_numpy_compatible_type and added_numpy_compatible_type == removed_numpy_compatible_type:
                _original_type = added_numpy_compatible_type
        if _original_type is None:
            return

        added = np_array_factory(added, dtype=_original_type)
        removed = np_array_factory(removed, dtype=_original_type)

        pairs = cartesian_product_numpy(added, removed)

        pairs_transposed = pairs.T

        distances = _get_numpy_array_distance(
            pairs_transposed[0], pairs_transposed[1],
            max_=self.cutoff_distance_for_pairs,
            use_log_scale=self.use_log_scale,
            log_scale_similarity_threshold=self.log_scale_similarity_threshold,
        )

        i = 0
        for added_hash in hashes_added:
            for removed_hash in hashes_removed:
                pre_calced_distances["{}--{}".format(added_hash, removed_hash)] = distances[i]
                i += 1
        return pre_calced_distances


def _get_item_length(item, parents_ids=frozenset([])):
    """
    Get the number of operations in a diff object.
    It is designed mainly for the delta view output
    but can be used with other dictionary types of view outputs too.
    """
    length = 0
    if isinstance(item, Mapping):
        for key, subitem in item.items():
            # dedupe the repetition report so the number of times items have shown up does not affect the distance.
            if key in {'iterable_items_added_at_indexes', 'iterable_items_removed_at_indexes'}:
                new_subitem = dict_()
                for path_, indexes_to_items in subitem.items():
                    used_value_ids = set()
                    new_indexes_to_items = dict_()
                    for k, v in indexes_to_items.items():
                        v_id = id(v)
                        if v_id not in used_value_ids:
                            used_value_ids.add(v_id)
                            new_indexes_to_items[k] = v
                    new_subitem[path_] = new_indexes_to_items
                subitem = new_subitem

            # internal keys such as _numpy_paths should not count towards the distance
            if isinstance(key, strings) and (key.startswith('_') or key == 'deep_distance' or key == 'new_path'):
                continue

            item_id = id(subitem)
            if parents_ids and item_id in parents_ids:
                continue
            parents_ids_added = add_to_frozen_set(parents_ids, item_id)
            length += _get_item_length(subitem, parents_ids_added)
    elif isinstance(item, numbers):
        length = 1
    elif isinstance(item, strings):
        length = 1
    elif isinstance(item, Iterable):
        for subitem in item:
            item_id = id(subitem)
            if parents_ids and item_id in parents_ids:
                continue
            parents_ids_added = add_to_frozen_set(parents_ids, item_id)
            length += _get_item_length(subitem, parents_ids_added)
    elif isinstance(item, type):  # it is a class
        length = 1
    else:
        if hasattr(item, '__dict__'):
            for subitem in item.__dict__:
                item_id = id(subitem)
                parents_ids_added = add_to_frozen_set(parents_ids, item_id)
                length += _get_item_length(subitem, parents_ids_added)
    return length


def _get_numbers_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1):
    """
    Get the distance of 2 numbers. The output is a number between 0 to the max.
    The reason is the
    When max is returned means the 2 numbers are really far, and 0 means they are equal.
    """
    if num1 == num2:
        return 0
    if use_log_scale:
        distance = logarithmic_distance(num1, num2)
        if distance < 0:
            return 0
        return distance
    if not isinstance(num1, float):
        num1 = float(num1)
    if not isinstance(num2, float):
        num2 = float(num2)
    # Since we have a default cutoff of 0.3 distance when
    # getting the pairs of items during the ingore_order=True
    # calculations, we need to make the divisor of comparison very big
    # so that any 2 numbers can be chosen as pairs.
    divisor = (num1 + num2) / max_
    if divisor == 0:
        return max_
    try:
        return min(max_, abs((num1 - num2) / divisor))
    except Exception:  # pragma: no cover. I don't think this line will ever run but doesn't hurt to leave it.
        return max_  # pragma: no cover


def _numpy_div(a, b, replace_inf_with=1):
    max_array = np.full(shape=a.shape, fill_value=replace_inf_with, dtype=np_float64)
    result = np.divide(a, b, out=max_array, where=b != 0, dtype=np_float64)
    # wherever 2 numbers are the same, make sure the distance is zero. This is mainly for 0 divided by zero.
    result[a == b] = 0
    return result

# To deal with numbers close to zero
MATH_LOG_OFFSET = 1e-10

def numpy_apply_log_keep_sign(array, offset=MATH_LOG_OFFSET):
    # Calculate the absolute value and add the offset
    abs_plus_offset = np.abs(array) + offset
    
    # Calculate the logarithm
    log_values = np.log(abs_plus_offset)
    
    # Apply the original signs to the log values
    signed_log_values = np.copysign(log_values, array)
    
    return signed_log_values


def logarithmic_similarity(a: numbers, b: numbers, threshold: float=0.1) -> float:
    """
    A threshold of 0.1 translates to about 10.5% difference.
    A threshold of 0.5 translates to about 65% difference.
    A threshold of 0.05 translates to about 5.1% difference.
    """
    return logarithmic_distance(a, b) < threshold


def logarithmic_distance(a: numbers, b: numbers) -> float:
    # Apply logarithm to the absolute values and consider the sign
    a = float(a)
    b = float(b)
    log_a = math.copysign(math.log(abs(a) + MATH_LOG_OFFSET), a)
    log_b = math.copysign(math.log(abs(b) + MATH_LOG_OFFSET), b)

    return abs(log_a - log_b)


def _get_numpy_array_distance(num1, num2, max_=1, use_log_scale=False, log_scale_similarity_threshold=0.1):
    """
    Get the distance of 2 numbers. The output is a number between 0 to the max.
    The reason is the
    When max is returned means the 2 numbers are really far, and 0 means they are equal.
    """
    # Since we have a default cutoff of 0.3 distance when
    # getting the pairs of items during the ingore_order=True
    # calculations, we need to make the divisor of comparison very big
    # so that any 2 numbers can be chosen as pairs.
    if use_log_scale:
        num1 = numpy_apply_log_keep_sign(num1)
        num2 = numpy_apply_log_keep_sign(num2)

    divisor = (num1 + num2) / max_
    result = _numpy_div((num1 - num2), divisor, replace_inf_with=max_)

    distance_array = np.clip(np.absolute(result), 0, max_)
    if use_log_scale:
        distance_array[distance_array < log_scale_similarity_threshold] = 0
    return distance_array


def _get_datetime_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold):
    return _get_numbers_distance(date1.timestamp(), date2.timestamp(), max_)


def _get_date_distance(date1, date2, max_, use_log_scale, log_scale_similarity_threshold):
    return _get_numbers_distance(date1.toordinal(), date2.toordinal(), max_)


def _get_timedelta_distance(timedelta1, timedelta2, max_, use_log_scale, log_scale_similarity_threshold):
    return _get_numbers_distance(timedelta1.total_seconds(), timedelta2.total_seconds(), max_)


def _get_time_distance(time1, time2, max_, use_log_scale, log_scale_similarity_threshold):
    return _get_numbers_distance(time_to_seconds(time1), time_to_seconds(time2), max_)


TYPES_TO_DIST_FUNC = [
    (only_numbers, _get_numbers_distance),
    (datetime.datetime, _get_datetime_distance),
    (datetime.date, _get_date_distance),
    (datetime.timedelta, _get_timedelta_distance),
    (datetime.time, _get_time_distance),
]


def get_numeric_types_distance(num1, num2, max_, use_log_scale=False, log_scale_similarity_threshold=0.1):
    for type_, func in TYPES_TO_DIST_FUNC:
        if isinstance(num1, type_) and isinstance(num2, type_):
            return func(num1, num2, max_, use_log_scale, log_scale_similarity_threshold)
    return not_found