您好,登錄后才能下訂單哦!
SpringCloud中怎么實現gateway限流,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
路由過濾器允許以某種方式修改傳入的HTTP請求或傳出的HTTP響應,路徑過濾器的范圍限定為特定路徑,Spring Cloud Gateway包含許多內置的GatewayFilter工廠。
Spring Cloud Gateway限流就是通過內置的RequestRateLimiterGateWayFilterFactory工廠來實現的。
當然,官方的肯定不能滿足我們部分業務需求,因此可以自定義限流過濾器。
## yml如下配置,就可以為該路由添加此攔截器:
spring: cloud: gateway: routes: - id: test_route uri: localhost predicates: - Path=/host/address filters: - name: RequestRateLimiter args: ## 允許用戶每秒執行多少請求,而不會丟棄任何請求。這是令牌桶填充的速率。 redis-rate-limiter.replenishRate: 1 ## 是一秒鐘內允許用戶執行的最大請求數。這是令牌桶可以容納的令牌數。將此值設置為零將阻止所有請求。 redis-rate-limiter.burstCapacity: 3 ## KeyResolver是一個簡單的獲取用戶請求參數 我這里以主機地址為key來作限流 key-resolver: "#{@hostAddrKeyResolver}"
## RequestRateLimiterGateWayFilterFactory代碼:
//AbstractGatewayFilterFactory實現GatewayFilterFactory接口,自定義的過濾工廠可以繼承 //AbstractGatewayFilterFactory并編寫apply方法 public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> { public static final String KEY_RESOLVER_KEY = "keyResolver"; private final RateLimiter defaultRateLimiter; private final KeyResolver defaultKeyResolver; public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter,KeyResolver defaultKeyResolver) { super(Config.class); this.defaultRateLimiter = defaultRateLimiter; this.defaultKeyResolver = defaultKeyResolver; } public KeyResolver getDefaultKeyResolver() { return defaultKeyResolver; } public RateLimiter getDefaultRateLimiter() { return defaultRateLimiter; } @SuppressWarnings("unchecked") @Override public GatewayFilter apply(Config config) { //yml中我們配置的hostAddrKeyResolver KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver; //這個就是限流的具體實現,默認使用RedisRateLimiter RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter; return (exchange, chain) -> { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); return resolver.resolve(exchange).flatMap(key -> //這里的isAllowed就是具體實現,輸入參數為路由id和限流key(這里為主機地址hostAddress) // TODO: if key is empty? limiter.isAllowed(route.getId(), key).flatMap(response -> { for (Map.Entry<String, String> header : response.getHeaders().entrySet()) { exchange.getResponse().getHeaders().add(header.getKey(), header.getValue()); } //如果為真,通過攔截 if (response.isAllowed()) { return chain.filter(exchange); } //否則設置http碼為429,too many request exchange.getResponse().setStatusCode(config.getStatusCode()); return exchange.getResponse().setComplete(); })); }; } }
分析:
1.加載KeyResolver,從配置文件中加載,此處我配置了hostAddrKeyResolver,即根據host地址來進行限流。如果為空,使用默認的PrincipalNameKeyResolver
2.加載RateLimiter,默認使用RedisRateLimiter。
3.執行RedisRateLimiter的isAllowed方法,得到response,如果isAllowed為true則通過攔截,否則返回429(isAllowed方法具體實現下文描述)。
## HostAddrKeyResolver:
@Slf4j public class HostAddrKeyResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { log.info("HostAddrKeyResolver 限流"); return Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } }
在啟動類中注入bean
@Bean public HostAddrKeyResolver hostAddrKeyResolver() { return new HostAddrKeyResolver(); }
## RedisRateLimiter:
@Override @SuppressWarnings("unchecked") public Mono<Response> isAllowed(String routeId, String id) { //判斷是否初始化 if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } //獲取配置 Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for route " + routeId); } //令牌桶填充速率 int replenishRate = routeConfig.getReplenishRate(); //令牌桶可容納令牌數 int burstCapacity = routeConfig.getBurstCapacity(); try { //獲取redis的key,執行lua腳本時傳入 List<String> keys = getKeys(id); //獲取參數,執行lua腳本時傳入 List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); } @NotNull public HashMap<String, String> getHeaders(Config config, Long tokensLeft) { HashMap<String, String> headers = new HashMap<>(); headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(config.getReplenishRate())); headers.put(this.burstCapacityHeader, String.valueOf(config.getBurstCapacity())); return headers; } static List<String> getKeys(String id) { // use {} around keys to use Redis Key hash tags // this allows for using redis cluster // Make a unique key per user. String prefix = "request_rate_limiter.{" + id; //令牌桶剩余令牌數 String tokenKey = prefix + "}.tokens"; //令牌桶最后填充令牌時間 String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); }
分析:
1.判斷是否初始化,加載配置,獲取令牌填充速率和令牌桶大小
2.根據路由id組合成兩個redis中的key值,傳入lua腳本
request_rate_limiter.{id}.tokens 令牌桶剩余令牌數
request_rate_limiter.{id}.timestamp 令牌桶最后填充令牌時間
3.把令牌填充速率,令牌桶大小,當前時間(單位:秒),消耗令牌數(默認為1)組合傳入lua腳本
4.執行lua腳本
5.flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) 這個是對執行lua腳本過程中發生異常的處理,它會忽略異常,返回令牌。這樣就能跟redis解耦,不對它強依賴。
該實現核心主要體現在lua腳本上,它使用的是令牌桶算法
詳見spring-cloud-gateway-core下的request_rate_limiter.lua
## 獲取剩余令牌數的redis key local tokens_key = KEYS[1] ## 獲取最后一次填充令牌的時間 local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) ## 令牌填充速率 local rate = tonumber(ARGV[1]) ## 令牌桶大小 local capacity = tonumber(ARGV[2]) ## 當前秒數 local now = tonumber(ARGV[3]) ## 消耗令牌數,默認1 local requested = tonumber(ARGV[4]) ## 計算令牌桶需要填充的時間 local fill_time = capacity/rate ## 計算key的存活時間 local ttl = math.floor(fill_time2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) ## 獲取剩余的令牌數 local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) ## 獲取令牌最后填充時間 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) ## 計算得到剩余的令牌數 local filled_tokens = math.min(capacity, last_tokens+(deltarate)) ## 大于請求消耗令牌 allowed 設為true local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
看完上述內容,你們掌握SpringCloud中怎么實現gateway限流的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。