aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/deepdiff/lfucache.py')
-rw-r--r--.venv/lib/python3.12/site-packages/deepdiff/lfucache.py217
1 files changed, 217 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py
new file mode 100644
index 00000000..75d1708e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/deepdiff/lfucache.py
@@ -0,0 +1,217 @@
+"""
+LFU cache Written by Shane Wang
+https://medium.com/@epicshane/a-python-implementation-of-lfu-least-frequently-used-cache-with-o-1-time-complexity-e16b34a3c49b
+https://github.com/luxigner/lfu_cache
+Modified by Sep Dehpour
+"""
+from collections import defaultdict
+from threading import Lock
+from statistics import mean
+from deepdiff.helper import not_found, dict_, SetOrdered
+
+
+class CacheNode:
+ def __init__(self, key, report_type, value, freq_node, pre, nxt):
+ self.key = key
+ if report_type:
+ self.content = defaultdict(SetOrdered)
+ self.content[report_type].add(value)
+ else:
+ self.content = value
+ self.freq_node = freq_node
+ self.pre = pre # previous CacheNode
+ self.nxt = nxt # next CacheNode
+
+ def free_myself(self):
+ if self.freq_node.cache_head == self.freq_node.cache_tail: # type: ignore
+ self.freq_node.cache_head = self.freq_node.cache_tail = None # type: ignore
+ elif self.freq_node.cache_head == self: # type: ignore
+ self.nxt.pre = None # type: ignore
+ self.freq_node.cache_head = self.nxt # type: ignore
+ elif self.freq_node.cache_tail == self: # type: ignore
+ self.pre.nxt = None # type: ignore
+ self.freq_node.cache_tail = self.pre # type: ignore
+ else:
+ self.pre.nxt = self.nxt # type: ignore
+ self.nxt.pre = self.pre # type: ignore
+
+ self.pre = None
+ self.nxt = None
+ self.freq_node = None
+
+
+class FreqNode:
+ def __init__(self, freq, pre, nxt):
+ self.freq = freq
+ self.pre = pre # previous FreqNode
+ self.nxt = nxt # next FreqNode
+ self.cache_head = None # CacheNode head under this FreqNode
+ self.cache_tail = None # CacheNode tail under this FreqNode
+
+ def count_caches(self):
+ if self.cache_head is None and self.cache_tail is None:
+ return 0
+ elif self.cache_head == self.cache_tail:
+ return 1
+ else:
+ return '2+'
+
+ def remove(self):
+ if self.pre is not None:
+ self.pre.nxt = self.nxt
+ if self.nxt is not None:
+ self.nxt.pre = self.pre
+
+ pre = self.pre
+ nxt = self.nxt
+ self.pre = self.nxt = self.cache_head = self.cache_tail = None
+
+ return (pre, nxt)
+
+ def pop_head_cache(self):
+ if self.cache_head is None and self.cache_tail is None:
+ return None
+ elif self.cache_head == self.cache_tail:
+ cache_head = self.cache_head
+ self.cache_head = self.cache_tail = None
+ return cache_head
+ else:
+ cache_head = self.cache_head
+ self.cache_head.nxt.pre = None # type: ignore
+ self.cache_head = self.cache_head.nxt # type: ignore
+ return cache_head
+
+ def append_cache_to_tail(self, cache_node):
+ cache_node.freq_node = self
+
+ if self.cache_head is None and self.cache_tail is None:
+ self.cache_head = self.cache_tail = cache_node
+ else:
+ cache_node.pre = self.cache_tail
+ cache_node.nxt = None
+ self.cache_tail.nxt = cache_node # type: ignore
+ self.cache_tail = cache_node
+
+ def insert_after_me(self, freq_node):
+ freq_node.pre = self
+ freq_node.nxt = self.nxt
+
+ if self.nxt is not None:
+ self.nxt.pre = freq_node
+
+ self.nxt = freq_node
+
+ def insert_before_me(self, freq_node):
+ if self.pre is not None:
+ self.pre.nxt = freq_node
+
+ freq_node.pre = self.pre
+ freq_node.nxt = self
+ self.pre = freq_node
+
+
+class LFUCache:
+
+ def __init__(self, capacity):
+ self.cache = dict_() # {key: cache_node}
+ if capacity <= 0:
+ raise ValueError('Capacity of LFUCache needs to be positive.') # pragma: no cover.
+ self.capacity = capacity
+ self.freq_link_head = None
+ self.lock = Lock()
+
+ def get(self, key):
+ with self.lock:
+ if key in self.cache:
+ cache_node = self.cache[key]
+ freq_node = cache_node.freq_node
+ content = cache_node.content
+
+ self.move_forward(cache_node, freq_node)
+
+ return content
+ else:
+ return not_found
+
+ def set(self, key, report_type=None, value=None):
+ with self.lock:
+ if key in self.cache:
+ cache_node = self.cache[key]
+ if report_type:
+ cache_node.content[report_type].add(value)
+ else:
+ cache_node.content = value
+ else:
+ if len(self.cache) >= self.capacity:
+ self.dump_cache()
+
+ self.create_cache_node(key, report_type, value)
+
+ def __contains__(self, key):
+ return key in self.cache
+
+ def move_forward(self, cache_node, freq_node):
+ if freq_node.nxt is None or freq_node.nxt.freq != freq_node.freq + 1:
+ target_freq_node = FreqNode(freq_node.freq + 1, None, None)
+ target_empty = True
+ else:
+ target_freq_node = freq_node.nxt
+ target_empty = False
+
+ cache_node.free_myself()
+ target_freq_node.append_cache_to_tail(cache_node)
+
+ if target_empty:
+ freq_node.insert_after_me(target_freq_node)
+
+ if freq_node.count_caches() == 0:
+ if self.freq_link_head == freq_node:
+ self.freq_link_head = target_freq_node
+
+ freq_node.remove()
+
+ def dump_cache(self):
+ head_freq_node = self.freq_link_head
+ self.cache.pop(head_freq_node.cache_head.key) # type: ignore
+ head_freq_node.pop_head_cache() # type: ignore
+
+ if head_freq_node.count_caches() == 0: # type: ignore
+ self.freq_link_head = head_freq_node.nxt # type: ignore
+ head_freq_node.remove() # type: ignore
+
+ def create_cache_node(self, key, report_type, value):
+ cache_node = CacheNode(
+ key=key, report_type=report_type,
+ value=value, freq_node=None, pre=None, nxt=None)
+ self.cache[key] = cache_node
+
+ if self.freq_link_head is None or self.freq_link_head.freq != 0:
+ new_freq_node = FreqNode(0, None, None)
+ new_freq_node.append_cache_to_tail(cache_node)
+
+ if self.freq_link_head is not None:
+ self.freq_link_head.insert_before_me(new_freq_node)
+
+ self.freq_link_head = new_freq_node
+ else:
+ self.freq_link_head.append_cache_to_tail(cache_node)
+
+ def get_sorted_cache_keys(self):
+ result = [(i, freq.freq_node.freq) for i, freq in self.cache.items()]
+ result.sort(key=lambda x: -x[1])
+ return result
+
+ def get_average_frequency(self):
+ return mean(freq.freq_node.freq for freq in self.cache.values())
+
+
+class DummyLFU:
+
+ def __init__(self, *args, **kwargs):
+ pass
+
+ set = __init__
+ get = __init__
+
+ def __contains__(self, key):
+ return False