您好,登錄后才能下訂單哦!
這篇文章給大家介紹LRU原理及實現是怎樣的,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
硬盤容量大訪問速度慢,內存空間小訪問速度塊,內存空間如何實現淘汰機制呢?LFU、LRU、ARC、FIFO、MRU
最不經常使用算法(LFU):在一般標準的操作系統教材里,會用下面的方式來演示 LRU 原理,假設內存只能容納3個頁大小,按照 7 0 1 2 0 3 0 4 的次序訪問頁。假設內存按照棧的方式來描述訪問時間,在上面的,是最近訪問的,在下面的是,最遠時間訪問的,LRU就是這樣工作的。
但是如果讓我們自己設計一個基于 LRU 的緩存,這樣設計可能問題很多,這段內存按照訪問時間進行了排序,會有大量的內存拷貝操作,所以性能肯定是不能接受的。
那么如何設計一個LRU緩存,使得放入和移除都是 O(1) 的,我們需要把訪問次序維護起來,但是不能通過內存中的真實排序來反應,有一種方案就是使用雙向鏈表。
整體的設計思路是,可以使用 HashMap 存儲 key,這樣可以做到 save 和 get key的時間都是 O(1),而 HashMap 的 Value 指向雙向鏈表實現的 LRU 的 Node 節點,如圖所示。
LRU 存儲是基于雙向鏈表實現的,下面的圖演示了它的原理。其中 head 代表雙向鏈表的表頭,tail 代表尾部。首先預先設置 LRU 的容量,如果存儲滿了,可以通過 O(1) 的時間淘汰掉雙向鏈表的尾部,每次新增和訪問數據,都可以通過 O(1)的效率把新的節點增加到對頭,或者把已經存在的節點移動到隊頭。
save("key1", 7) save("key2", 0) save("key3", 1) save("key4", 2) get("key2") save("key5", 3) get("key2") save("key6", 4)
下面展示了,預設大小是 3 的,LRU存儲的在存儲和訪問過程中的變化。為了簡化圖復雜度,圖中沒有展示 HashMap部分的變化,僅僅演示了上圖 LRU 雙向鏈表的變化。我們對這個LRU緩存的操作序列如下:相應的 LRU 雙向鏈表部分變化如下:
s = save, g = get
總結一下核心操作的步驟:
save(key, value),首先在 HashMap 找到 Key 對應的節點,如果節點存在,更新節點的值,并把這個節點移動隊頭。如果不存在,需要構造新的節點,并且嘗試把節點塞到隊頭,如果LRU空間不足,則通過 tail 淘汰掉隊尾的節點,同時在 HashMap 中移除 Key。
get(key),通過 HashMap 找到 LRU 鏈表節點,因為根據LRU 原理,這個節點是最新訪問的,所以要把節點插入到隊頭,然后返回緩存的值。
完整基于 Java 的代碼參考如下
class DLinkedNode { String key; int value; DLinkedNode pre; DLinkedNode post; }
LRU Cache
public class LRUCache { private Hashtable<Integer, DLinkedNode> cache = new Hashtable<Integer, DLinkedNode>(); private int count; private int capacity; private DLinkedNode head, tail; public LRUCache(int capacity) { this.count = 0; this.capacity = capacity; head = new DLinkedNode(); head.pre = null; tail = new DLinkedNode(); tail.post = null; head.post = tail; tail.pre = head; } public int get(String key) { DLinkedNode node = cache.get(key); if(node == null){ return -1; // should raise exception here. } // move the accessed node to the head; this.moveToHead(node); return node.value; } public void set(String key, int value) { DLinkedNode node = cache.get(key); if(node == null){ DLinkedNode newNode = new DLinkedNode(); newNode.key = key; newNode.value = value; this.cache.put(key, newNode); this.addNode(newNode); ++count; if(count > capacity){ // pop the tail DLinkedNode tail = this.popTail(); this.cache.remove(tail.key); --count; } }else{ // update the value. node.value = value; this.moveToHead(node); } } /** * Always add the new node right after head; */ private void addNode(DLinkedNode node){ node.pre = head; node.post = head.post; head.post.pre = node; head.post = node; } /** * Remove an existing node from the linked list. */ private void removeNode(DLinkedNode node){ DLinkedNode pre = node.pre; DLinkedNode post = node.post; pre.post = post; post.pre = pre; } /** * Move certain node in between to the head. */ private void moveToHead(DLinkedNode node){ this.removeNode(node); this.addNode(node); } // pop the current tail. private DLinkedNode popTail(){ DLinkedNode res = tail.pre; this.removeNode(res); return res; } }
那么問題的后半部分,是 Redis 如何實現,這個問題這么問肯定是有坑的,那就是redis肯定不是這樣實現的。
如果按照HashMap和雙向鏈表實現,需要額外的存儲存放 next 和 prev 指針,犧牲比較大的存儲空間,顯然是不劃算的。所以Redis采用了一個近似的做法,就是隨機取出若干個key,然后按照訪問時間排序后,淘汰掉最不經常使用的,具體分析如下:
為了支持LRU,Redis 2.8.19中使用了一個全局的LRU時鐘,server.lruclock
,定義如下,
#define REDIS_LRU_BITS 24 unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */
默認的LRU時鐘的分辨率是1秒,可以通過改變REDIS_LRU_CLOCK_RESOLUTION
宏的值來改變,Redis會在serverCron()
中調用updateLRUClock
定期的更新LRU時鐘,更新的頻率和hz參數有關,默認為100ms
一次,如下,
#define REDIS_LRU_CLOCK_MAX ((1<<REDIS_LRU_BITS)-1) /* Max value of obj->lru */ #define REDIS_LRU_CLOCK_RESOLUTION 1 /* LRU clock resolution in seconds */ void updateLRUClock(void) { server.lruclock = (server.unixtime / REDIS_LRU_CLOCK_RESOLUTION) & REDIS_LRU_CLOCK_MAX; }
server.unixtime
是系統當前的unix時間戳,當 lruclock 的值超出REDIS_LRU_CLOCK_MAX時,會從頭開始計算,所以在計算一個key的最長沒有訪問時間時,可能key本身保存的lru訪問時間會比當前的lrulock還要大,這個時候需要計算額外時間,如下,
/* Given an object returns the min number of seconds the object was never * requested, using an approximated LRU algorithm. */ unsigned long estimateObjectIdleTime(robj *o) { if (server.lruclock >= o->lru) { return (server.lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION; } else { return ((REDIS_LRU_CLOCK_MAX - o->lru) + server.lruclock) * REDIS_LRU_CLOCK_RESOLUTION; } }
Redis支持和LRU相關淘汰策略包括,
volatile-lru
設置了過期時間的key參與近似的lru淘汰策略
allkeys-lru
所有的key均參與近似的lru淘汰策略
當進行LRU淘汰時,Redis按如下方式進行的,
...... /* volatile-lru and allkeys-lru policy */ else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) { for (k = 0; k < server.maxmemory_samples; k++) { sds thiskey; long thisval; robj *o; de = dictGetRandomKey(dict); thiskey = dictGetKey(de); /* When policy is volatile-lru we need an additional lookup * to locate the real key, as dict is set to db->expires. */ if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) de = dictFind(db->dict, thiskey); o = dictGetVal(de); thisval = estimateObjectIdleTime(o); /* Higher idle time is better candidate for deletion */ if (bestkey == NULL || thisval > bestval) { bestkey = thiskey; bestval = thisval; } } } ......
Redis會基于server.maxmemory_samples
配置選取固定數目的key,然后比較它們的lru訪問時間,然后淘汰最近最久沒有訪問的key,maxmemory_samples的值越大,Redis的近似LRU算法就越接近于嚴格LRU算法,但是相應消耗也變高,對性能有一定影響,樣本值默認為5。
Redis的過期策略,是有定期刪除+惰性刪除兩種。
定期好理解,默認100s就隨機抽一些設置了過期時間的key,去檢查是否過期,過期了就刪了。
為啥不掃描全部設置了過期時間的key呢?
假如Redis里面所有的key都有過期時間,都掃描一遍?那太恐怖了,而且我們線上基本上也都是會設置一定的過期時間的。全掃描跟你去查數據庫不帶where條件不走索引全表掃描一樣,100s一次,Redis累都累死了。
如果一直沒隨機到很多key,里面不就存在大量的無效key了?
好問題,惰性刪除,見名知意,惰性嘛,我不主動刪,我懶,我等你來查詢了我看看你過期沒,過期就刪了還不給你返回,沒過期該怎么樣就怎么樣。
最后就是如果的如果,定期沒刪,我也沒查詢,那可咋整?
內存淘汰機制!
官網上給到的內存淘汰機制是以下幾個:
noeviction:返回錯誤當內存限制達到并且客戶端嘗試執行會讓更多內存被使用的命令(大部分的寫入指令,但DEL和幾個例外)
allkeys-lru: 嘗試回收最少使用的鍵(LRU),使得新添加的數據有空間存放。
volatile-lru: 嘗試回收最少使用的鍵(LRU),但僅限于在過期集合的鍵,使得新添加的數據有空間存放。
allkeys-random: 回收隨機的鍵使得新添加的數據有空間存放。
volatile-random: 回收隨機的鍵使得新添加的數據有空間存放,但僅限于在過期集合的鍵。
volatile-ttl: 回收在過期集合的鍵,并且優先回收存活時間(TTL)較短的鍵,使得新添加的數據有空間存放。
如果沒有鍵滿足回收的前提條件的話,策略volatile-lru, volatile-random以及volatile-ttl就和noeviction 差不多了。
Redis為什么不使用真實的LRU實現是因為這需要太多的內存。不過近似的LRU算法對于應用而言應該是等價的。使用真實的LRU算法與近似的算法可以通過下面的圖像對比。
其實在大家熟悉的LinkedHashMap中也實現了LRU算法的,
在LinkedHashMap里。我們只需要擴展下即可,代碼示例如下: /** * Constructs an empty <tt>LinkedHashMap</tt> instance with the * specified initial capacity, load factor and ordering mode. * * @param initialCapacity the initial capacity * @param loadFactor the load factor * @param accessOrder the ordering mode - <tt>true</tt> for * access-order, <tt>false</tt> for insertion-order * @throws IllegalArgumentException if the initial capacity is negative * or the load factor is nonpositive */ public LinkedHashMap(int initialCapacity, float loadFactor, boolean accessOrder) { super(initialCapacity, loadFactor); this.accessOrder = accessOrder; } //方法為protected ,擺明了是想被繼承、重寫 protected boolean removeEldestEntry(Map.Entry<K,V> eldest) { return false; }
使用accessOrder來標識是使用訪問順序,還是插入順序。默認為插入順序。當accessOrder為訪問順序、容量固定時,即為LRU
舉例如下:
當容量超過100時,開始執行LRU策略:將最近最少未使用的對象 evict 掉。
Map<Long, String> LRUCollection = Collections.synchronizedMap(new LinkedHashMap<Long, String>(100, .75F, true) { @Override protected boolean removeEldestEntry(Map.Entry<Long, String> eldest) { return size() > 100; } });
真實面試中會讓你寫LUR算法,你可別搞原始的那個,那真TM多,寫不完的,你要么懟上面這個,要么懟下面這個,找一個數據結構實現下Java版本的LRU還是比較容易的,知道啥原理就好了。
class LRULinkedHashMap<K,V> extends LinkedHashMap<K,V> { /** * */ private static final long serialVersionUID = 1882839504956564761L; private int capacity; public LRULinkedHashMap(int capacity) { super(capacity,0.75f,true); this.capacity = capacity; } @Override public boolean removeEldestEntry(Map.Entry<K,V> eldest) { System.out.println("即將根據LRU算法,刪除最近最少使用元素:key= "+eldest.getKey()+" value= "+eldest.getValue()+" ."); //此行代碼是關鍵,一旦容量超出限制,即按照LRU進行刪除 return size()>capacity; } } public class Test { public static void main(String[] args) { testLinkedHashMap(); testLRULinkedHashMap(); } public static void testLinkedHashMap() { //容量固定,accessOrder=true Map<Integer, Integer> map = new LinkedHashMap<Integer, Integer>(5, 0.75f, true); map.put(1, 1); map.put(2, 2); map.put(3, 3); map.put(4, 4); map.put(5, 5); //此時輸出1,2,3,4,5 for(Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator(); it.hasNext();) { System.out.println(it.next().getValue()); } map.put(4, 4); map.put(6, 6); //此時輸出1,2,3,5,4,6(自動擴容) for(Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator(); it.hasNext();) { System.out.println(it.next().getValue()); } } public static void testLRULinkedHashMap() { //容量固定,accessOrder=true Map<Integer, Integer> map = new LRULinkedHashMap<Integer, Integer>(5); map.put(1, 1); map.put(2, 2); map.put(3, 3); map.put(4, 4); map.put(5, 5); //此時輸出1,2,3,4,5 for(Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator(); it.hasNext();) { System.out.println(it.next().getValue()); } map.put(4, 4); map.put(6, 6); //此時輸出2,3,5,4,6(容量鎖定,進行刪除) for(Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator(); it.hasNext();) { System.out.println(it.next().getValue()); } } }
關于LRU原理及實現是怎樣的就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。