"""
LFU cache Written by Shane Wang
https://medium.com/@epicshane/a-python-implementation-of-lfu-least-frequently-used-cache-with-o-1-time-complexity-e16b34a3c49b
https://github.com/luxigner/lfu_cache
Modified by Sep Dehpour
"""
from collections import defaultdict
from threading import Lock
from statistics import mean
from deepdiff.helper import not_found, dict_, SetOrdered
class CacheNode:
def __init__(self, key, report_type, value, freq_node, pre, nxt):
self.key = key
if report_type:
self.content = defaultdict(SetOrdered)
self.content[report_type].add(value)
else:
self.content = value
self.freq_node = freq_node
self.pre = pre # previous CacheNode
self.nxt = nxt # next CacheNode
def free_myself(self):
if self.freq_node.cache_head == self.freq_node.cache_tail: # type: ignore
self.freq_node.cache_head = self.freq_node.cache_tail = None # type: ignore
elif self.freq_node.cache_head == self: # type: ignore
self.nxt.pre = None # type: ignore
self.freq_node.cache_head = self.nxt # type: ignore
elif self.freq_node.cache_tail == self: # type: ignore
self.pre.nxt = None # type: ignore
self.freq_node.cache_tail = self.pre # type: ignore
else:
self.pre.nxt = self.nxt # type: ignore
self.nxt.pre = self.pre # type: ignore
self.pre = None
self.nxt = None
self.freq_node = None
class FreqNode:
def __init__(self, freq, pre, nxt):
self.freq = freq
self.pre = pre # previous FreqNode
self.nxt = nxt # next FreqNode
self.cache_head = None # CacheNode head under this FreqNode
self.cache_tail = None # CacheNode tail under this FreqNode
def count_caches(self):
if self.cache_head is None and self.cache_tail is None:
return 0
elif self.cache_head == self.cache_tail:
return 1
else:
return '2+'
def remove(self):
if self.pre is not None:
self.pre.nxt = self.nxt
if self.nxt is not None:
self.nxt.pre = self.pre
pre = self.pre
nxt = self.nxt
self.pre = self.nxt = self.cache_head = self.cache_tail = None
return (pre, nxt)
def pop_head_cache(self):
if self.cache_head is None and self.cache_tail is None:
return None
elif self.cache_head == self.cache_tail:
cache_head = self.cache_head
self.cache_head = self.cache_tail = None
return cache_head
else:
cache_head = self.cache_head
self.cache_head.nxt.pre = None # type: ignore
self.cache_head = self.cache_head.nxt # type: ignore
return cache_head
def append_cache_to_tail(self, cache_node):
cache_node.freq_node = self
if self.cache_head is None and self.cache_tail is None:
self.cache_head = self.cache_tail = cache_node
else:
cache_node.pre = self.cache_tail
cache_node.nxt = None
self.cache_tail.nxt = cache_node # type: ignore
self.cache_tail = cache_node
def insert_after_me(self, freq_node):
freq_node.pre = self
freq_node.nxt = self.nxt
if self.nxt is not None:
self.nxt.pre = freq_node
self.nxt = freq_node
def insert_before_me(self, freq_node):
if self.pre is not None:
self.pre.nxt = freq_node
freq_node.pre = self.pre
freq_node.nxt = self
self.pre = freq_node
class LFUCache:
def __init__(self, capacity):
self.cache = dict_() # {key: cache_node}
if capacity <= 0:
raise ValueError('Capacity of LFUCache needs to be positive.') # pragma: no cover.
self.capacity = capacity
self.freq_link_head = None
self.lock = Lock()
def get(self, key):
with self.lock:
if key in self.cache:
cache_node = self.cache[key]
freq_node = cache_node.freq_node
content = cache_node.content
self.move_forward(cache_node, freq_node)
return content
else:
return not_found
def set(self, key, report_type=None, value=None):
with self.lock:
if key in self.cache:
cache_node = self.cache[key]
if report_type:
cache_node.content[report_type].add(value)
else:
cache_node.content = value
else:
if len(self.cache) >= self.capacity:
self.dump_cache()
self.create_cache_node(key, report_type, value)
def __contains__(self, key):
return key in self.cache
def move_forward(self, cache_node, freq_node):
if freq_node.nxt is None or freq_node.nxt.freq != freq_node.freq + 1:
target_freq_node = FreqNode(freq_node.freq + 1, None, None)
target_empty = True
else:
target_freq_node = freq_node.nxt
target_empty = False
cache_node.free_myself()
target_freq_node.append_cache_to_tail(cache_node)
if target_empty:
freq_node.insert_after_me(target_freq_node)
if freq_node.count_caches() == 0:
if self.freq_link_head == freq_node:
self.freq_link_head = target_freq_node
freq_node.remove()
def dump_cache(self):
head_freq_node = self.freq_link_head
self.cache.pop(head_freq_node.cache_head.key) # type: ignore
head_freq_node.pop_head_cache() # type: ignore
if head_freq_node.count_caches() == 0: # type: ignore
self.freq_link_head = head_freq_node.nxt # type: ignore
head_freq_node.remove() # type: ignore
def create_cache_node(self, key, report_type, value):
cache_node = CacheNode(
key=key, report_type=report_type,
value=value, freq_node=None, pre=None, nxt=None)
self.cache[key] = cache_node
if self.freq_link_head is None or self.freq_link_head.freq != 0:
new_freq_node = FreqNode(0, None, None)
new_freq_node.append_cache_to_tail(cache_node)
if self.freq_link_head is not None:
self.freq_link_head.insert_before_me(new_freq_node)
self.freq_link_head = new_freq_node
else:
self.freq_link_head.append_cache_to_tail(cache_node)
def get_sorted_cache_keys(self):
result = [(i, freq.freq_node.freq) for i, freq in self.cache.items()]
result.sort(key=lambda x: -x[1])
return result
def get_average_frequency(self):
return mean(freq.freq_node.freq for freq in self.cache.values())
class DummyLFU:
def __init__(self, *args, **kwargs):
pass
set = __init__
get = __init__
def __contains__(self, key):
return False