您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Redis中如何實現限流策略,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
當系統處理能力有限,如何組織計劃外的請求對系統施壓。首先我們先看下一些簡單的限流策略,防止暴力攻擊。比如要對IP訪問,沒5s只能訪問10次,超過進行攔截。
如上圖,一般使用滑動窗口來統計區間時間內的訪問次數。
使用 zset
記錄 IP
訪問次數,每個 IP
通過 key
保存下來,score
保存當前時間戳,value
唯一用時間戳或者UUID來實現
public class RedisLimiterTest { private Jedis jedis; public RedisLimiterTest(Jedis jedis) { this.jedis = jedis; } /** * @param ipAddress Ip地址 * @param period 特定的時間內,單位秒 * @param maxCount 最大允許的次數 * @return */ public boolean isIpLimit(String ipAddress, int period, int maxCount) { String key = String.format("ip:%s", ipAddress); // 毫秒時間戳 long currentTimeMillis = System.currentTimeMillis(); Pipeline pipe = jedis.pipelined(); // redis事務,保證原子性 pipe.multi(); // 存放數據,value 和 score 都使用毫秒時間戳 pipe.zadd(key, currentTimeMillis, "" + UUID.randomUUID()); // 移除窗口區間所有的元素 pipe.zremrangeByScore(key, 0, currentTimeMillis - period * 1000); // 獲取時間窗口內的行為數量 Response<Long> count = pipe.zcard(key); // 設置 zset 過期時間,避免冷用戶持續占用內存,這里寬限1s pipe.expire(key, period + 1); // 提交事務 pipe.exec(); pipe.close(); // 比較數量是否超標 return count.get() > maxCount; } public static void main(String[] args) { Jedis jedis = new Jedis("localhost", 6379); RedisLimiterTest limiter = new RedisLimiterTest(jedis); for (int i = 1; i <= 20; i++) { // 驗證IP 10秒鐘之內只能訪問5次 boolean isLimit = limiter.isIpLimit("222.73.55.22", 10, 5); System.out.println("訪問第" + i + "次, 結果:" + (isLimit ? "限制訪問" : "允許訪問")); } } }
執行結果
訪問第1次, 結果:允許訪問 訪問第2次, 結果:允許訪問 訪問第3次, 結果:允許訪問 訪問第4次, 結果:允許訪問 訪問第5次, 結果:允許訪問 訪問第6次, 結果:限制訪問 訪問第7次, 結果:限制訪問 ... ...
缺點:要記錄時間窗口所有的行為記錄,量很大,比如,限定60s內不能超過100萬次這種場景,不太適合這樣限流,因為會消耗大量的儲存空間。
漏斗的容量是限定的,如果滿了,就裝不進去了。
如果將漏嘴放開,水就會往下流,流走一部分之后,就又可以繼續往里面灌水。
如果漏嘴流水的速率大于灌水的速率,那么漏斗永遠都裝不滿。
如果漏嘴流水速率小于灌水的速率,那么一旦漏斗滿了,灌水就需要暫停并等待漏斗騰空。
public class FunnelLimiterTest { static class Funnel { int capacity; // 漏斗容量 float leakingRate; // 漏嘴流水速率 int leftQuota; // 漏斗剩余空間 long leakingTs; // 上一次漏水時間 public Funnel(int capacity, float leakingRate) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = capacity; this.leakingTs = System.currentTimeMillis(); } void makeSpace() { long nowTs = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs; // 距離上一次漏水過去了多久 int deltaQuota = (int) (deltaTs * leakingRate); // 騰出的空間 = 時間*漏水速率 if (deltaQuota < 0) { // 間隔時間太長,整數數字過大溢出 this.leftQuota = capacity; this.leakingTs = nowTs; return; } if (deltaQuota < 1) { // 騰出空間太小 就等下次,最小單位是1 return; } this.leftQuota += deltaQuota; // 漏斗剩余空間 = 漏斗剩余空間 + 騰出的空間 this.leakingTs = nowTs; if (this.leftQuota > this.capacity) { // 剩余空間不得高于容量 this.leftQuota = this.capacity; } } boolean watering(int quota) { makeSpace(); if (this.leftQuota >= quota) { // 判斷剩余空間是否足夠 this.leftQuota -= quota; return true; } return false; } } // 所有的漏斗 private Map<String, Funnel> funnels = new HashMap<>(); /** * @param capacity 漏斗容量 * @param leakingRate 漏嘴流水速率 quota/s */ public boolean isIpLimit(String ipAddress, int capacity, float leakingRate) { String key = String.format("ip:%s", ipAddress); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return !funnel.watering(1); // 需要1個quota } public static void main(String[] args) throws Exception{ FunnelLimiterTest limiter = new FunnelLimiterTest(); for (int i = 1; i <= 50; i++) { // 每1s執行一次 Thread.sleep(1000); // 漏斗容量是2 ,漏嘴流水速率是0.5每秒, boolean isLimit = limiter.isIpLimit("222.73.55.22", 2, (float)0.5/1000); System.out.println("訪問第" + i + "次, 結果:" + (isLimit ? "限制訪問" : "允許訪問")); } } }
執行結果
訪問第1次, 結果:允許訪問 # 第1次,容量剩余2,執行后1 訪問第2次, 結果:允許訪問 # 第2次,容量剩余1,執行后0 訪問第3次, 結果:允許訪問 # 第3次,由于過了2s, 漏斗流水剩余1個空間,所以容量剩余1,執行后0 訪問第4次, 結果:限制訪問 # 第4次,過了1s, 剩余空間小于1, 容量剩余0 訪問第5次, 結果:允許訪問 # 第5次,由于過了2s, 漏斗流水剩余1個空間,所以容量剩余1,執行后0 訪問第6次, 結果:限制訪問 # 以此類推... 訪問第7次, 結果:允許訪問 訪問第8次, 結果:限制訪問 訪問第9次, 結果:允許訪問 訪問第10次, 結果:限制訪問
我們觀察 Funnel
對象的幾個字段,我們發現可以將 Funnel
對象的內容按字段存儲到一個 hash
結構中,灌水的時候將 hash
結構的字段取出來進行邏輯運算后,再將新值回填到 hash
結構中就完成了一次行為頻度的檢測。
但是有個問題,我們無法保證整個過程的原子性。從 hash
結構中取值,然后在內存里運算,再回填到 hash
結構,這三個過程無法原子化,意味著需要進行適當的加鎖控制。而一旦加鎖,就意味著會有加鎖失敗,加鎖失敗就需要選擇重試或者放棄。
如果重試的話,就會導致性能下降。如果放棄的話,就會影響用戶體驗。同時,代碼的復雜度也跟著升高很多。這真是個艱難的選擇,我們該如何解決這個問題呢?Redis-Cell
救星來了!
Redis 4.0 提供了一個限流 Redis 模塊,它叫 redis-cell
。該模塊也使用了漏斗算法,并提供了原子的限流指令。
該模塊只有1條指令cl.throttle
,它的參數和返回值都略顯復雜,接下來讓我們來看看這個指令具體該如何使用。
> cl.throttle key:xxx 15 30 60 1
15
: 15 capacity 這是漏斗容量
30 60
: 30 operations / 60 seconds 這是漏水速率
1
: need 1 quota (可選參數,默認值也是1)
> cl.throttle laoqian:reply 15 30 60 1) (integer) 0 # 0 表示允許,1表示拒絕 2) (integer) 15 # 漏斗容量capacity 3) (integer) 14 # 漏斗剩余空間left_quota 4) (integer) -1 # 如果拒絕了,需要多長時間后再試(漏斗有空間了,單位秒) 5) (integer) 2 # 多長時間后,漏斗完全空出來(left_quota==capacity,單位秒)
在執行限流指令時,如果被拒絕了,就需要丟棄或重試。cl.throttle
指令考慮的非常周到,連重試時間都幫你算好了,直接取返回結果數組的第四個值進行 sleep
即可,如果不想阻塞線程,也可以異步定時任務來重試。
關于“Redis中如何實現限流策略”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。