您好,登錄后才能下訂單哦!
今天小編給大家分享一下Redis和本地緩存如何使用的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
眾所周知,緩存最主要的目的就是加速訪問,緩解數據庫壓力。最常用的緩存就是分布式緩存,比如redis,在面對大部分并發場景或者一些中小型公司流量沒有那么高的情況,使用redis基本都能解決了。但是在流量較高的情況下可能得使用到本地緩存了,比如guava的LoadingCache和快手開源的ReloadableCache。
這部分會介紹redis,比如guava的LoadingCache和快手開源的ReloadableCache的使用場景和局限,通過這一部分的介紹就能知道在怎樣的業務場景下應該使用哪種緩存,以及為什么。
如果寬泛的說redis何時使用,那么自然就是用戶訪問量過高的地方使用,從而加速訪問,并且緩解數據庫壓力。如果細分的話,還得分為單節點問題和非單節點問題。
如果一個頁面用戶訪問量比較高,但是訪問的不是同一個資源。比如用戶詳情頁,訪問量比較高,但是每個用戶的數據都是不一樣的,這種情況顯然只能用分布式緩存了,如果使用redis,key為用戶唯一鍵,value則是用戶信息。
redis導致的緩存擊穿。
但是需要注意一點,一定要設置過期時間,而且不能設置到同一時間點過期。舉個例子,比如用戶又個活動頁,活動頁能看到用戶活動期間獲獎數據,粗心的人可能會設置用戶數據的過期時間點為活動結束,這樣會
單(熱)點問題
單節點問題說的是redis的單個節點的并發問題,因為對于相同的key會落到redis集群的同一個節點上,那么如果對這個key的訪問量過高,那么這個redis節點就存在并發隱患,這個key就稱為熱key。
如果所有用戶訪問的都是同一個資源,比如小愛同學app首頁對所有用戶展示的內容都一樣(初期),服務端給h6返回的是同一個大json,顯然得使用到緩存。首先我們考慮下用redis是否可行,由于redis存在單點問題,如果流量過大的話,那么所有用戶的請求到達redis的同一個節點,需要評估該節點能否抗住這么大流量。我們的規則是,如果單節點qps達到了千級別就要解決單點問題了(即使redis號稱能抗住十萬級別的qps),最常見的做法就是使用本地緩存。顯然小愛app首頁流量不過百,使用redis是沒問題的。
對于這上面說的熱key問題,我們最直接的做法就是使用本地緩存,比如你最熟悉的guava的LoadingCache,但是使用本地緩存要求能夠接受一定的臟數據,因為如果你更新了首頁,本地緩存是不會更新的,它只會根據一定的過期策略來重新加載緩存,不過在我們這個場景是完全沒問題的,因為一旦在后臺推送了首頁后就不會再去改變了。即使改變了也沒問題,可以設置寫過期為半小時,超過半小時重新加載緩存,這種短時間內的臟數據我們是可以接受的。
LoadingCache導致的緩存擊穿
雖然說本地緩存和機器上強相關的,雖然代碼層面寫的是半小時過期,但由于每臺機器的啟動時間不同,導致緩存的加載時間不同,過期時間也就不同,也就不會所有機器上的請求在同一時間緩存失效后都去請求數據庫。但是對于單一一臺機器也是會導致緩存穿透的,假如有10臺機器,每臺1000的qps,只要有一臺緩存過期就可能導致這1000個請求同時打到了數據庫。這種問題其實比較好解決,但是容易被忽略,也就是在設置LoadingCache的時候使用LoadingCache的load-miss方法,而不是直接判斷cache.getIfPresent()== null然后去請求db;前者會加虛擬機層面的鎖,保證只有一個請求打到數據庫去,從而完美的解決了這個問題。
但是,如果對于實時性要求較高的情況,比如有段時間要經常做活動,我要保證活動頁面能近實時更新,也就是運營在后臺配置好了活動信息后,需要在C端近實時展示這次配置的活動信息,此時使用LoadingCache肯定就不能滿足了。
對于上面說的LoadingCache不能解決的實時問題,可以考慮使用ReloadableCache,這是快手開源的一個本地緩存框架,最大的特點是支持多機器同時更新緩存,假設我們修改了首頁信息,然后請求打到的是A機器,這個時候重新加載ReloadableCache,然后它會發出通知,監聽了同一zk節點的其他機器收到通知后重新更新緩存。使用這個緩存一般的要求是將全量數據加載到本地緩存,所以如果數據量過大肯定會對gc造成壓力,這種情況就不能使用了。由于小愛同學首頁這個首頁是帶有狀態的,一般online狀態的就那么兩個,所以完全可以使用ReloadableCache來只裝載online狀態的首頁。
到這里三種緩存基本都介紹完了,做個小結:
對于非熱點的數據訪問,比如用戶維度的數據,直接使用redis即可;
對于熱點數據的訪問,如果流量不是很高,無腦使用redis即可;
對于熱點數據,如果允許一定時間內的臟數據,使用LoadingCache即可;
對于熱點數據,如果一致性要求較高,同時數據量不大的情況,使用ReloadableCache即可;
不管哪種本地緩存雖然都帶有虛擬機層面的加鎖來解決擊穿問題,但是意外總有可能以你意想不到的方式發生,保險起見你可以使用兩級緩存的方式即本地緩存+redis+db。
這里redis的使用就不再多說了,相信很多人對api的使用比我還熟悉
這個是guava提供的網上一抓一大把,但是給兩點注意事項
要使用load-miss的話, 要么使用V get(K key, Callable<? extends V> loader)
;要么使用build的時候使用的是build(CacheLoader<? super K1, V1> loader)
這個時候可以直接使用get()了。此外建議使用load-miss,而不是getIfPresent==null的時候再去查數據庫,這可能導致緩存擊穿;
使用load-miss是因為這是線程安全的,如果緩存失效的話,多個線程調用get的時候只會有一個線程去db查詢,其他線程需要等待,也就是說這是線程安全的。
LoadingCache<String, String> cache = CacheBuilder.newBuilder() .maximumSize(1000L) .expireAfterAccess(Duration.ofHours(1L)) // 多久不訪問就過期 .expireAfterWrite(Duration.ofHours(1L)) // 多久這個key沒修改就過期 .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { // 數據裝載方式,一般就是loadDB return key + " world"; } }); String value = cache.get("hello"); // 返回hello world
導入三方依賴
<dependency> <groupId>com.github.phantomthief</groupId> <artifactId>zknotify-cache</artifactId> <version>0.1.22</version> </dependency>
需要看文檔,不然無法使用,有興趣自己寫一個也行的。
public interface ReloadableCache<T> extends Supplier<T> { /** * 獲取緩存數據 */ @Override T get(); /** * 通知全局緩存更新 * 注意:如果本地緩存沒有初始化,本方法并不會初始化本地緩存并重新加載 * * 如果需要初始化本地緩存,請先調用 {@link ReloadableCache#get()} */ void reload(); /** * 更新本地緩存的本地副本 * 注意:如果本地緩存沒有初始化,本方法并不會初始化并刷新本地的緩存 * * 如果需要初始化本地緩存,請先調用 {@link ReloadableCache#get()} */ void reloadLocal(); }
這三個真的是亙古不變的問題,如果流量大確實需要考慮。
簡單說就是緩存失效,導致大量請求同一時間打到了數據庫。對于緩存擊穿問題上面已經給出了很多解決方案了。
比如使用本地緩存
本地緩存使用load-miss方法
使用第三方服務來加載緩存
1.2和都說過,主要來看3。假如業務愿意只能使用redis而無法使用本地緩存,比如數據量過大,實時性要求比較高。那么當緩存失效的時候就得想辦法保證只有少量的請求打到數據庫。很自然的就想到了使用分布式鎖,理論上說是可行的,但實際上存在隱患。我們的分布式鎖相信很多人都是使用redis+lua的方式實現的,并且在while中進行了輪訓,這樣請求量大,數據多的話會導致無形中讓redis成了隱患,并且占了太多業務線程,其實僅僅是引入了分布式鎖就加大了復雜度,我們的原則就是能不用就不用。
那么我們是不是可以設計一個類似分布式鎖,但是更可靠的rpc服務呢?當調用get方法的時候這個rpc服務保證相同的key打到同一個節點,并且使用synchronized來進行加鎖,之后完成數據的加載。在快手提供了一個叫cacheSetter的框架。下面提供一個簡易版,自己寫也很容易實現。
import com.google.common.collect.Lists; import org.apache.commons.collections4.CollectionUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; /** * @Description 分布式加載緩存的rpc服務,如果部署了多臺機器那么調用端最好使用id做一致性hash保證相同id的請求打到同一臺機器。 **/ public abstract class AbstractCacheSetterService implements CacheSetterService { private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>(); private final Object lock = new Object(); @Override public void load(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } CountDownLatch latch; Collection<CountDownLatch> loadingLatchList; synchronized (lock) { loadingLatchList = excludeLoadingIds(needLoadIds); needLoadIds = Collections.unmodifiableCollection(needLoadIds); latch = saveLatch(needLoadIds); } System.out.println("needLoadIds:" + needLoadIds); try { if (CollectionUtils.isNotEmpty(needLoadIds)) { loadCache(needLoadIds); } } finally { release(needLoadIds, latch); block(loadingLatchList); } } /** * 加鎖 * @param loadingLatchList 需要加鎖的id對應的CountDownLatch */ protected void block(Collection<CountDownLatch> loadingLatchList) { if (CollectionUtils.isEmpty(loadingLatchList)) { return; } System.out.println("block:" + loadingLatchList); loadingLatchList.forEach(l -> { try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } }); } /** * 釋放鎖 * @param needLoadIds 需要釋放鎖的id集合 * @param latch 通過該CountDownLatch來釋放鎖 */ private void release(Collection<String> needLoadIds, CountDownLatch latch) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } synchronized (lock) { needLoadIds.forEach(id -> loadCache.remove(id)); } if (latch != null) { latch.countDown(); } } /** * 加載緩存,比如根據id從db查詢數據,然后設置到redis中 * @param needLoadIds 加載緩存的id集合 */ protected abstract void loadCache(Collection<String> needLoadIds); /** * 對需要加載緩存的id綁定CountDownLatch,后續相同的id請求來了從map中找到CountDownLatch,并且await,直到該線程加載完了緩存 * @param needLoadIds 能夠正在去加載緩存的id集合 * @return 公用的CountDownLatch */ protected CountDownLatch saveLatch(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return null; } CountDownLatch latch = new CountDownLatch(1); needLoadIds.forEach(loadId -> loadCache.put(loadId, latch)); System.out.println("loadCache:" + loadCache); return latch; } /** * 哪些id正在加載數據,此時持有相同id的線程需要等待 * @param ids 需要加載緩存的id集合 * @return 正在加載的id所對應的CountDownLatch集合 */ private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) { List<CountDownLatch> loadingLatchList = Lists.newArrayList(); Iterator<String> iterator = ids.iterator(); while (iterator.hasNext()) { String id = iterator.next(); CountDownLatch latch = loadCache.get(id); if (latch != null) { loadingLatchList.add(latch); iterator.remove(); } } System.out.println("loadingLatchList:" + loadingLatchList); return loadingLatchList; } }
業務實現
import java.util.Collection; public class BizCacheSetterRpcService extends AbstractCacheSetterService { @Override protected void loadCache(Collection<String> needLoadIds) { // 讀取db進行處理 // 設置緩存 } }
簡單來說就是請求的數據在數據庫不存在,導致無效請求打穿數據庫。
解法也很簡單,從db獲取數據的方法(getByKey(K key))一定要給個默認值。
比如我有個獎池,金額上限是1W,用戶完成任務的時候給他發筆錢,并且使用redis記錄下來,并且落表,用戶在任務頁面能實時看到獎池剩余金額,在任務開始的時候顯然獎池金額是不變的,redis和db里面都沒有發放金額的記錄,這就導致每次必然都去查db,對于這種情況,從db沒查出來數據應該緩存個值0到緩存。
就是大量緩存集中失效打到了db,當然肯定都是一類的業務緩存,歸根到底是代碼寫的有問題。可以將緩存失效的過期時間打散,別讓其集中失效就可以了。
以上就是“Redis和本地緩存如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。