91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Redisson的原理是什么

發布時間:2022-08-26 11:45:29 來源:億速云 閱讀:167 作者:iii 欄目:開發技術

本文小編為大家詳細介紹“Redisson的原理是什么”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Redisson的原理是什么”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

1、分布式鎖場景

  • 互聯網秒殺

  • 搶優惠卷

  • 接口冪等性校驗

1.1 案例1

如下代碼模擬了下單減庫存的場景,我們分析下在高并發場景下會存在什么問題

package com.wangcp.redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class IndexController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 模擬下單減庫存的場景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        // 從redis 中拿當前庫存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
        return "end";
    }
}

假設在redis中庫存(stock)初始值是100。

現在有5個客戶端同時請求該接口,可能就會存在同時執行

int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

這行代碼,獲取到的值都為100,緊跟著判斷大于0后都進行-1操作,最后設置到redis 中的值都為99。但正常執行完成后redis中的值應為 95。

1.2 案例2-使用synchronized 實現單機鎖

在遇到案例1的問題后,大部分人的第一反應都會想到加鎖來控制事務的原子性,如下代碼所示:

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    synchronized (this){
        // 從redis 中拿當前庫存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }
    return "end";
}

現在當有多個請求訪問該接口時,同一時刻只有一個請求可進入方法體中進行庫存的扣減,其余請求等候。

但我們都知道,synchronized 鎖是屬于JVM級別的,也就是我們俗稱的“單機鎖”。但現在基本大部分公司使用的都是集群部署,現在我們思考下以上代碼在集群部署的情況下還能保證庫存數據的一致性嗎?

Redisson的原理是什么

答案是不能,如上圖所示,請求經Nginx分發后,可能存在多個服務同時從Redis中獲取庫存數據,此時只加synchronized (單機鎖)是無效的,并發越高,出現問題的幾率就越大。

1.3 案例3-使用redis的SETNX實現分布式鎖

setnx:將 key 的值設為 value,當且僅當 key 不存在。

若給定 key 已經存在,則 setnx 不做任何動作。

使用setnx實現簡單的分布式鎖:

/**
 * 模擬下單減庫存的場景
 * @return
 */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    // 使用 setnx 添加分布式鎖
    // 返回 true 代表之前redis中沒有key為 lockKey 的值,并已進行成功設置
    // 返回 false 代表之前redis中已經存在 lockKey 這個key了
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
    if(!result){
        // 代表已經加鎖了
        return "error_code";
    }
    // 從redis 中拿當前庫存的值
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock",realStock + "");
        System.out.println("扣減成功,剩余庫存:" + realStock);
    }else{
        System.out.println("扣減失敗,庫存不足");
    }
    // 釋放鎖
    stringRedisTemplate.delete(lockKey);
    return "end";
}

我們知道 Redis 是單線程執行,現在再看案例2中的流程圖時,哪怕高并發場景下多個請求都執行到了setnx的代碼,redis會根據請求的先后順序進行排列,只有排列在隊頭的請求才能設置成功。其它請求只能返回“error_code”。

當setnx設置成功后,可執行業務代碼對庫存扣減,執行完成后對鎖進行釋放。

我們再來思考下以上代碼已經完美實現分布式鎖了嗎?能夠支撐高并發場景嗎?答案并不是,上面的代碼還是存在很多問題的,離真正的分布式鎖還差的很遠。我們分析下以上代碼存在的問題:

死鎖:假如第一個請求在setnx加鎖完成后,執行業務代碼時出現了異常,那釋放鎖的代碼就無法執行,后面所有的請求也都無法進行操作了。

針對死鎖的問題,我們對代碼再次進行優化,添加try-finally,在finally中添加釋放鎖代碼,這樣無論如何都會執行釋放鎖代碼,如下所示:

/**
     * 模擬下單減庫存的場景
     * @return
     */
@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    try{
        // 使用 setnx 添加分布式鎖
        // 返回 true 代表之前redis中沒有key為 lockKey 的值,并已進行成功設置
        // 返回 false 代表之前redis中已經存在 lockKey 這個key了
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "wangcp");
        if(!result){
            // 代表已經加鎖了
            return "error_code";
        }
        // 從redis 中拿當前庫存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        // 釋放鎖
        stringRedisTemplate.delete(lockKey);
    }
    return "end";
}

經過改進后的代碼是否還存在問題呢?我們思考正常執行的情況下應該是沒有問題,但我們假設請求在執行到業務代碼時服務突然宕機了,或者正巧你的運維同事重新發版,粗暴的 kill -9 掉了呢,那代碼還能執行 finally 嗎?

1.4 案例4-加入過期時間

針對想到的問題,對代碼再次進行優化,加入過期時間,這樣即便出現了上述的問題,在時間到期后鎖也會自動釋放掉,不會出現“死鎖”的情況。

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    try{
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
        if(!result){
            // 代表已經加鎖了
            return "error_code";
        }
        // 從redis 中拿當前庫存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        // 釋放鎖
        stringRedisTemplate.delete(lockKey);
    }
    return "end";
}

現在我們再思考一下,給鎖加入過期時間后就可以了嗎?就可以完美運行不出問題了嗎?

超時時間設置的10s真的合適嗎?如果不合適設置多少秒合適呢?

假設同一時間有三個請求。

  • 請求1首先加鎖后需執行15秒,但在執行到10秒時鎖失效釋放。

  • 請求2進入后加鎖執行,在請求2執行到5秒時,請求1執行完成進行鎖釋放,但此時釋放掉的是請求2的鎖。

  • 請求3在請求2執行5秒時開始執行,但在執行到3秒時請求2執行完成將請求3的鎖進行釋放。

我們現在只是模擬3個請求便可看出問題,如果在真正高并發的場景下,可能鎖就會面臨“一直失效”或“永久失效”。

那么具體問題出在哪里呢?總結為以下幾點:

  • 1.存在請求釋放鎖時釋放掉的并不是自己的鎖

  • 2.超時時間過短,存在代碼未執行完便自動釋放

針對問題我們思考對應的解決方法:

  • 針對問題1,我們想到在請求進入時生成一個唯一id,使用該唯一id作為鎖的value值,釋放時先進行獲取比對,比對相同時再進行釋放,這樣就可以解決釋放掉其它請求鎖的問題。

  • 針對問題2,我們可使用延長過期時間。

1.5 案例5-使用唯一id作為鎖的value值

針對想到的問題,對代碼再次進行優化,使用唯一id作為鎖的value值,這樣便不存在請求釋放鎖時釋放掉的并不是自己的鎖。

@RequestMapping(value = "/duduct_stock")
public String deductStock(){
    String lockKey = "product_001";
    try{
    	//1、占分布式鎖。去redis占坑并設置過期時間 setIfAbsent()操作是原子性的
        final String uuid = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,uuid,30,TimeUnit.SECONDS);
        if(!result){
            // 代表已經加鎖了
            return "error_code";
        }
        // 從redis 中拿當前庫存的值
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + "");
            System.out.println("扣減成功,剩余庫存:" + realStock);
        }else{
            System.out.println("扣減失敗,庫存不足");
        }
    }finally {
        // 釋放鎖 使用lua腳本解鎖  保證原子性
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(lockKey), uuid);
    }
    return "end";
}

但是我們思考一下,不斷的延長過期時間真的合適嗎?設置短了存在超時自動釋放的問題,設置長了又會出現宕機后一段時間鎖無法釋放的問題,雖然不會再出現“死鎖”。針對這個問題,如何解決呢?

我們應該要開啟一個守護線程進行監聽。將超時時間設置默認30s,線程每10s調用一次判斷鎖還是否存在,如果存在則延長鎖的超時時間。

1.6 案例6-Redisson分布式鎖

SpringBoot集成Redisson步驟

引入依賴

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

初始化客戶端

@Bean
public RedissonClient redisson(){
    // 單機模式
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.3.170:6379").setDatabase(0);
    return Redisson.create(config);
}

Redisson實現分布式鎖

@RestController
public class IndexController {
    @Autowired
    private RedissonClient redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 模擬下單減庫存的場景
     * @return
     */
    @RequestMapping(value = "/duduct_stock")
    public String deductStock(){
        String lockKey = "product_001";
        // 1.獲取鎖對象
        RLock redissonLock = redisson.getLock(lockKey);
        try{
            // 2.加鎖
            redissonLock.lock();  // 等價于 setIfAbsent(lockKey,"wangcp",10,TimeUnit.SECONDS);
            // 從redis 中拿當前庫存的值
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0){
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + "");
                System.out.println("扣減成功,剩余庫存:" + realStock);
            }else{
                System.out.println("扣減失敗,庫存不足");
            }
        }finally {
            // 3.釋放鎖
            redissonLock.unlock();
        }
        return "end";
    }
}

Redisson 分布式鎖實現原理圖

Redisson的原理是什么

Redisson 底層源碼分析

我們點擊 lock() 和 unlock() 方法,查看源碼,最終看到以下代碼

//加鎖 
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
//解鎖
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

沒錯,加鎖最終執行的就是這段 lua 腳本語言。

這段lua腳本命令在Redis中執行時,會被當成一條命令來執行,能夠保證原子性,故要不都成功,要不都失敗。

我們在源碼中看到Redssion的許多方法實現中很多都用到了lua腳本,這樣能夠極大的保證命令執行的原子性。

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end;

腳本的主要邏輯為:

  • exists 判斷 key 是否存在

  • 當判斷不存在則設置 key

  • 然后給設置的key追加過期時間

這樣來看其實和我們前面案例5中的實現方法本質沒啥區別,都是使用底層都是lua。只不過redisson做了更多的判斷,考慮的更加的周全。而且他還完善了我們案例5中的缺陷,他實現了一個看門狗機制。

Redisson鎖"看門狗"源碼

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

這段代碼是在加鎖后開啟一個守護線程進行監聽。Redisson超時時間默認設置30s,線程每10s調用一次判斷鎖還是否存在,如果存在則延長鎖的超時時間。

現在,我們再回過頭來看看案例5中的加鎖代碼與原理圖,其實完善到這種程度已經可以滿足很多公司的使用了,并且很多公司也確實是這樣用的。但我們再思考下是否還存在問題呢?例如以下場景:

  • 眾所周知 Redis 在實際部署使用時都是集群部署的,那在高并發場景下我們加鎖,當把key寫入到master節點后,master還未同步到slave節點時master宕機了,原有的slave節點經過選舉變為了新的master節點,此時可能就會出現鎖失效問題。

  • 通過分布式鎖的實現機制我們知道,高并發場景下只有加鎖成功的請求可以繼續處理業務邏輯。那就出現了大伙都來加鎖,但有且僅有一個加鎖成功了,剩余的都在等待。其實分布式鎖與高并發在語義上就是相違背的,我們的請求雖然都是并發,但Redis幫我們把請求進行了排隊執行,也就是把我們的并行轉為了串行。串行執行的代碼肯定不存在并發問題了,但是程序的性能肯定也會因此受到影響。

針對這些問題,我們再次思考解決方案

  • 在思考解決方案時我們首先想到CAP原則(一致性、可用性、分區容錯性),那么現在的Redis就是滿足AP(可用性、分區容錯性),如果想要解決該問題我們就需要尋找滿足CP(一致性、分區容錯性)的分布式系統。首先想到的就是zookeeper,zookeeper的集群間數據同步機制是當主節點接收數據后不會立即返回給客戶端成功的反饋,它會先與子節點進行數據同步,半數以上的節點都完成同步后才會通知客戶端接收成功。并且如果主節點宕機后,根據zookeeper的Zab協議(Zookeeper原子廣播)重新選舉的主節點一定是已經同步成功的。

  • 那么問題來了,Redisson與zookeeper分布式鎖我們如何選擇呢?答案是如果并發量沒有那么高,可以用zookeeper來做分布式鎖,但是它的并發能力遠遠不如Redis。如果你對并發要求比較高的話,那就用Redis,偶爾出現的主從架構鎖失效的問題其實是可以容忍的。

  • 關于第二個提升性能的問題,我們可以參考ConcurrentHashMap的鎖分段技術的思想,例如我們代碼的庫存量當前為1000,那我們可以分為10段,每段100,然后對每段分別加鎖,這樣就可以同時執行10個請求的加鎖與處理,當然有要求的同學還可以繼續細分。但其實Redis的Qps已經達到10W+了,沒有特別高并發量的場景下也是完全夠用的。

讀到這里,這篇“Redisson的原理是什么”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西乡县| 皮山县| 金乡县| 咸阳市| 桃源县| 都昌县| 仪陇县| 马山县| 兴宁市| 庆元县| 启东市| 长白| 米易县| 武川县| 云林县| 乐山市| 济源市| 同仁县| 多伦县| 化州市| 崇信县| 兴隆县| 宜昌市| 乌鲁木齐县| 洪湖市| 车致| 辽宁省| 瑞丽市| 枝江市| 长顺县| 高清| 沈阳市| 时尚| 邹平县| 高邮市| 美姑县| 寿光市| 建平县| 舟山市| 措美县| 科技|