【源码阅读】Redisson lock源码
创始人
2024-11-13 07:34:56
0

目录

底层原理

加锁机制

锁互斥机制

可重入锁机制

总结 


Redisson 加锁非常简单,还支持 redis 单实例、redis 哨兵、redis cluster、redis master-slave 等各种部署架构

RLock lock = redisson.getLock("cyk-test"); lock.lock(); lock.unlock();

底层原理

img

加锁机制

废话不多说,直接看源码,下面的代码先不看,先看 tryAcquire 是如何获取锁的

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {         long threadId = Thread.currentThread().getId();         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);         // lock acquired         if (ttl == null) {             return;         }          CompletableFuture future = subscribe(threadId);         pubSub.timeout(future);         RedissonLockEntry entry;         if (interruptibly) {             entry = commandExecutor.getInterrupted(future);         } else {             entry = commandExecutor.get(future);         }         ... }

查看 tryAcquire 方法,点进去看发现调用了 tryAcquireAsync0 方法,这里 RFuture 继承自 java.util.concurrent.Future,表示这是一个异步的任务,get 方法会同步获取结果

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId)); }  private RFuture tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }

查看 tryAcquireAsync 方法

    private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {         RFuture ttlRemainingFuture;         if (leaseTime > 0) {             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);         } else {             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                     TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);         }         ...     }

查看 tryLockInnerAsync 方法

     RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {         return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,                 "if ((redis.call('exists', KEYS[1]) == 0) " +                             "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +                         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                         "redis.call('pexpire', KEYS[1], ARGV[1]); " +                         "return nil; " +                     "end; " +                     "return redis.call('pttl', KEYS[1]);",                 Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));     }

这是一段加锁的 lua 脚本,用于保证原子性,参数解释如下:

  • KEYS[1] 表示加锁的 key

  • ARGV[1] 表示锁 key 的默认超时时间

  • ARGV[2] 表示加锁的客户端 ID,由 UUID:线程 ID 组成

客户端在第一次加锁完成,会设置一个 key 为客户端 ID,value 为加锁次数的 hash 数据结构:

img

现在知道了内部方法的逻辑,往回倒一步,重点看我加在代码中的注释

    private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {         RFuture ttlRemainingFuture;         if (leaseTime > 0) {             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);         } else {             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,                     TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);         }          // 这里是定义了加锁Lua脚本的异步任务,通过CompletableFuture编排         CompletionStage s = handleNoSync(threadId, ttlRemainingFuture);         ttlRemainingFuture = new CompletableFutureWrapper<>(s);          // 这个f依赖ttlRemainingFuture的结果,如果入参的leaseTime<=0会触发看门狗机制         // 否则按照设置的过期时间来过期         CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {             // lock acquired             if (ttlRemaining == null) {                 if (leaseTime > 0) {                     internalLockLeaseTime = unit.toMillis(leaseTime);                 } else {                     scheduleExpirationRenewal(threadId);                 }             }             return ttlRemaining;         });          // 返回编排好的CompletableFuture         return new CompletableFutureWrapper<>(f);     }

把 RFuture 返回以后,就到了 get 方法阻塞获取方法这里

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId)); }  private RFuture tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {     return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }

最后回到了这里

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {         long threadId = Thread.currentThread().getId();         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);         // lock acquired         if (ttl == null) {             return;         }          // 这里的 subscribe 就是 Redis 订阅解锁的 lua 脚本中的 publish         CompletableFuture future = subscribe(threadId);         pubSub.timeout(future);         RedissonLockEntry entry;         if (interruptibly) {             entry = commandExecutor.getInterrupted(future);         } else {             entry = commandExecutor.get(future);         }         ...     }

接着看下面循环获取锁的逻辑

        try {             while (true) {                 // 自旋尝试获取锁                 ttl = tryAcquire(-1, leaseTime, unit, threadId);                 // 看Lua脚本,ttl为null说明锁获取到了                 if (ttl == null) {                     break;                 } ​                 // waiting for message                 if (ttl >= 0) {                     try {                         // 注意这里的tryAcquire和之前的tryAcquire不是同一个东西,这里是信号量的tryAcquire                         // entry.getLatch()这里返回的是信号量,释放锁的代码会释放一个许可                         // 如果没有可用的许可,会一直休眠直到超时时间 ttl ms                         entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                     } catch (InterruptedException e) {                         if (interruptibly) {                             throw e;                         }                         entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);                     }                 } else {                     // 当 key 存在但没有设置剩余生存时间时,pttl返回 -1,会走到这个逻辑                     // 我感觉正常流程走不到这个逻辑,因为当前线程无非是看到锁存在或者不存在                     // 看到锁不存在等于加锁成功了,因为Lua脚本是原子性的                     // 看到锁存在,默认也给了超时时间                     // 这里就没有设置超时时间,一直等释放锁的许可                     if (interruptibly) {                         entry.getLatch().acquire();                     } else {                         entry.getLatch().acquireUninterruptibly();                     }                 }             }         } finally {             unsubscribe(entry, threadId);         }

这里设计的巧妙之处就在于利用了消息订阅、信号量的机制,它不是无休止的盲等机制,也避免了不断的重试,而是检测到锁被释放才去尝试重新获取,这对 CPU 十分的友好

锁互斥机制

此时如果客户端 2 来尝试加锁,同样走进 RedissonLock#lock 方法,会咋样呢?第一个 if 判断会执行 exists myLock,发现 myLock 这个锁 key 已经存在了。接着第二个 if 判断,判断一下,myLock 锁 key 的 hash 数据结构中,对应客户端 2 的 ID 的 key 的 value 为 1,也没有。最终会获取到 pttl myLock 返回的锁 key 的剩余生存时间,进入 while 循环,不停的尝试加锁

可重入锁机制

那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

img

此时会执行可重入加锁的逻辑,走第二个 if 逻辑,对客户端 1 的加锁次数累加 1,此时 myLock 数据结构变为下面这样:

img

总结 

在这里插入图片描述

相关内容

热门资讯

绝活儿辅助!广西老友玩老是输怎... 绝活儿辅助!广西老友玩老是输怎么办(辅助挂)都是真的有辅助app(讲解有挂)在进入广西老友玩老是输怎...
法门辅助!福建13水插件(辅助... 法门辅助!福建13水插件(辅助挂)一贯是有辅助技巧(有挂技术)1、许多玩家不知道福建13水插件辅助怎...
办法辅助!潮友会app下载官方... 办法辅助!潮友会app下载官方辅助器(辅助挂)真是真的是有辅助app(有挂教程)该软件可以轻松地帮助...
妙招辅助!邯郸胡乐挂辅助(辅助... 妙招辅助!邯郸胡乐挂辅助(辅助挂)好像存在有辅助插件(有挂方略)1、上手简单,内置详细流程视频教学,...
教程书辅助!乐酷辅助(辅助挂)... 教程书辅助!乐酷辅助(辅助挂)其实存在有辅助脚本(有挂细节)乐酷辅助能透视中分为三种模型:乐酷辅助模...
学习辅助!决战卡五星辅助(辅助... 学习辅助!决战卡五星辅助(辅助挂)本来真的是有辅助软件(有人有挂)学习辅助!决战卡五星辅助(辅助挂)...
绝活辅助!边锋嘉兴麻将辅助器(... 绝活辅助!边锋嘉兴麻将辅助器(辅助挂)真是真的有辅助神器(新版有挂)1、边锋嘉兴麻将辅助器公共底牌简...
举措辅助!枫叶辅助器(辅助挂)... 举措辅助!枫叶辅助器(辅助挂)本来存在有辅助技巧(竟然有挂)1、下载好枫叶辅助器正确养号方法之后点击...
讲义辅助!点我达辅助(辅助挂)... 讲义辅助!点我达辅助(辅助挂)一直存在有辅助技巧(有人有挂)1、点我达辅助辅助器安装包、点我达辅助辅...
模块辅助!威信茶馆有挂的吗(辅... 模块辅助!威信茶馆有挂的吗(辅助挂)一直真的是有辅助脚本(揭秘有挂)1、玩家可以在威信茶馆有挂的吗线...