91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Redis如何實現高并發分布式鎖?

發布時間:2020-07-06 23:02:58 來源:網絡 閱讀:1612 作者:BlueMiaomiao 欄目:編程語言

眾所周知,分布式鎖在微服務架構中是重頭戲,尤其是在互聯網公司,基本上企業內部都會有自己的一套分布式鎖開發框架。本文主要介紹使用Redis如何構建高并發分布式鎖。

假設 存在一個SpringBoot的控制器,其扣減庫存的業務邏輯如下:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 將庫存取出來
    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判斷庫存夠不夠減
    if (stock > 0) {
        // 將庫存回寫到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("庫存扣減成功");
    } else {
        logger.info("庫存扣減失敗");
    }

    return "finished.";
}

不難看出,在應用服務器運行這段代碼的時候就會有線程安全性問題。因為多個線程同時去修改Redis服務中的數據。因此考慮給這段代碼加上一把鎖:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    synchronized (this) {
        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    }
    return "finished.";
}

這樣一來,當多個HTTP請求來請求數據的時候,多個線程去修改同一數據會有JVM本地鎖來進行合理的資源限制。雖然這樣解決了線程安全性問題,但是這僅僅是JVM級別的鎖,在分布式的環境下,由于像這樣的Web應用隨時會進行動態擴容,因此當多個應用的時候,同樣會有線程安全性問題,當上面這段代碼遇到類似下面的架構時還是會有各種各樣的問題:

Redis如何實現高并發分布式鎖?

對于上述的情況,我們可以使用redis api提供的setnx方法解決:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 嘗試獲取鎖
    Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

    // 判斷是否獲得鎖
    if (!flag) { return "error"; }

    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判斷庫存夠不夠減
    if (stock > 0) {
        // 將庫存回寫到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("庫存扣減成功");
    } else {
        logger.info("庫存扣減失敗");
    }

    // 刪除鎖
    stringRedisTemplate.delete("Hello");

    return "finished.";
}

setnx key value是將key的值設置為value,當且僅當key不存在的時候。如果設置成功就返回1,否則就返回0。

這樣的話,首先嘗試獲取鎖,然后當業務執行完成的時候再刪除鎖。但是還是有問題的,當獲取鎖的時候拋出異常或者業務執行拋出異常怎么辦,所以加入異常處理邏輯:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 嘗試獲取鎖
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 判斷是否獲得鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

經過這樣的修改,看起來沒什么問題了。但是當程序獲得鎖并且開始執行業務邏輯的時候,突然程序掛掉了或者被一些粗暴的運維工程師給kill,在finally中刪除鎖的邏輯就會得不到執行,因此就會產生死鎖。對于這種情況,我們可以給這個鎖設置一個超時時間:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 嘗試獲取鎖
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 設置超時時間, 根據業務場景估計超時時長
        stringRedisTmplate.expire("Hello", 10, TimeUnit.SECONDS);

        // 判斷是否獲得鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

如果程序這么來寫,相對來說安全一些了,但是還是存在問題。試想一下,當獲取鎖成功時,正想給這把鎖設置超時的時候,程序掛掉了,還是會出現死鎖的,因此在redis較高的版本中提供的setIfAbsent方法中可以同時設置鎖的超時時間。

 Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World", 10, TimeUnit.SECONDS);

這樣一來,嘗試獲取鎖和設置鎖的超時時間就具備原子性了。實際上經過我們這一番改造,這在小型企業已經沒有太大的問題, 因為像這種代碼每天也就執行幾百次,并不算做高并發的場景。當這樣的代碼被暴露在超高并發場景下的時候,還是會存在各種各樣的問題。試想一個場景,當一個HTTP請求請求到控制器的時候,應用獲取到鎖了,超時時間也設置成功了,但是應用的業務邏輯超過了超時時間,我們這里的超時時間設置的是10秒,當應用的業務邏輯執行15秒的時候,鎖就被redis服務刪除了。假設恰好此時又有一個HTTP請求來請求控制器,此時應用服務器會再啟動一個線程來獲取鎖,而且還獲取成功了,但是這次的HTTP請求對應的業務邏輯還沒有執行完。新來的TTTP請求也在執行,由于新來的HTTP請求也在執行,因為鎖超時后被刪除,新的HTTP請求也成功獲取鎖了。當原來的HTTP請求對應的業務邏輯執行完成以后,嘗試刪除鎖,這樣正好刪除的是新來的HTTP請求對應的鎖。這個時候redis中又沒有鎖了,這樣第三個HTTP請求又會獲得鎖,所以情況就不妙了。

為了解決上面的問題,我們可以將代碼優化為下面的樣子:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    String clientUuid = UUID.randomUUID().toString();
    try {
        // 嘗試獲取鎖,設置超時時間, 根據業務場景估計超時時長
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", clientUuid, 10, TimeUnit.SECONDS);

        // 判斷是否獲得鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖的時候判斷是不是自己的鎖
        if (clientUuid.equals(stringRedisTemplate.opsForValue().get("Hello"))) {
            stringRedisTemplate.delete("Hello");   
        }
    }
    return "finished.";
}

但是由于程序的不可預知性,誰也不能保證極端情況下,同時會有多個線程同時執行這段業務邏輯。我們可以在當執行業務邏輯的時候同時開一個定時器線程,每隔幾秒就重新將這把鎖設置為10秒,也就是給這把鎖進行“續命”。這樣就用擔心業務邏輯到底執行多長時間了。但是這樣程序的復雜性就會增加,每個業務邏輯都要寫好多的代碼,因此這里推薦在分布式環境下使用redisson。因此我們使用redisson實現分支線程的代碼:

  • 引入依賴:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>
  • 初始化Redisson的客戶端配置:
@Bean
public Redisson redisson () {
    Config cfg = new Config();
    cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
    return (Redisson) Redisson.create(cfg);
}
  • 在程序中注入Redisson客戶端:
@Autowired
private Redisson redisson;
  • 對應的業務邏輯:
@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    // 獲取鎖對象
    RLock lock = redisson.getLock("Hello");
    try {
        // 嘗試加鎖, 默認30秒, 自動后臺開一個線程實現鎖的續命
        lock.tryLock();

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
    return "finished.";
}

Redisson分布式鎖的實現原理如下:

Redis如何實現高并發分布式鎖?

但是這個架構還是存在問題的,因為redis服務器是主從的架構,當在master節點設置鎖之后,slave節點會立刻同步。但是如果剛在master節點設置上了鎖,slave節點還沒來得及設置,master節點就掛掉了。還是會產生上同樣的問題,新的線程獲得鎖。

因此使用redis構建高并發的分布式鎖,僅適合單機架構,當使用主從架構的redis時還是會出現線程安全性問題。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁乡县| 金川县| 宁津县| 桐梓县| 新乐市| 农安县| 云龙县| 赤壁市| 确山县| 勃利县| 中西区| 枝江市| 孝义市| 古蔺县| 鄂托克前旗| 凉城县| 鹤峰县| 太保市| 邳州市| 阳江市| 剑川县| 祁连县| 贺州市| 广安市| 永胜县| 博罗县| 丹棱县| 拉萨市| 乐昌市| 汨罗市| 淮阳县| 同江市| 新源县| 紫金县| 响水县| 凉城县| 蒲江县| 长岭县| 灌云县| 宜宾县| 介休市|