Redisson分布式锁源码篇 Redisson 是一个简单的Redis Java客户端,具有内存数据网格功能。
它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁 的实现。
GitHub地址:https://github.com/redisson/redisson
Redisson分布式锁不仅是相对成熟的分布式锁方案,而且在很多企业中都会去使用的,所以了解一下底层的实现还是很有必要的。
一、使用Redisson分布式锁 Redisson分布式锁的使用非常简单,我们只需要在maven中引入依赖,然后直接调用相关API即可。
1.1 引入依赖 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.6</version > </dependency >
1.2 调用API RedissonClient redissonClient = Redisson.create(); RLock lock = redissonClient.getLock("lock" ); lock.lock(); boolean b = lock.tryLock(); lock.unlock();
二、源码解析 下面我就带大家去分析一下Redisson的可重入和可重试的源码。
2.1 可重入锁原理 (1) 原理解释 Redisson分布式锁的可重入锁的原理:
使用Redis中Hash的key-value
结构,存储线程标识 和重入次数 ;
每次获取锁的时候,如果线程标识相同,就给Hash的value加1 ;
每次释放锁的时候,并不会删除锁,而是重入次数减1 ;
当所有逻辑执行完,如果重入次数为0,则删除锁 。
逻辑流程图
(2) 源码分析 ① 尝试获取锁 boolean b = lock.tryLock();
org.redisson.RedissonLock
@Override public boolean tryLock () { return get(tryLockAsync()); }
@Override public RFuture<Boolean> tryLockAsync () { return tryLockAsync(Thread.currentThread().getId()); }
@Override public RFuture<Boolean> tryLockAsync (long threadId) { return tryAcquireOnceAsync(-1 , -1 , null , threadId); }
注意,下面的代码我们只需要关注有注释的部分,剩余的部分会在后面讲到。
private RFuture<Boolean> tryAcquireOnceAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN); } RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
tryLockInnerAsync()
方法是获取锁的主要实现,我们可以看到,其实都是固定的Lua脚本逻辑。
<T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我们把这段Lua脚本摘出来分析一下逻辑:
if (redis.call('exists' , KEYS[1 ]) == 0 ) then redis.call('hincrby' , KEYS[1 ], ARGV[2 ], 1 ); redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); return nil ; end ; if (redis.call('hexists' , KEYS[1 ], ARGV[2 ]) == 1 ) then redis.call('hincrby' , KEYS[1 ], ARGV[2 ], 1 ); redis.call('pexpire' , KEYS[1 ], ARGV[1 ]); return nil ; "end; return redis.call('pttl', KEYS[1]);
② 释放锁
org.redisson.RedissonLock
@Override public void unlock () { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } }
同样地,我们只需要跟着有注释的代码往下看。
@Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise <Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } result.trySuccess(null ); }); return result; }
unlockInnerAsync()
方法是释放锁的主要逻辑。我们可以看到,其实也都是固定的Lua脚本逻辑。
protected RFuture<Boolean> unlockInnerAsync (long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;" , Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
我们把这段Lua脚本摘出来分析一下逻辑:
if (redis.call('hexists' , KEYS[1 ], ARGV[3 ]) == 0 ) then return nil ; end ; local counter = redis.call('hincrby' , KEYS[1 ], ARGV[3 ], -1 ); if (counter > 0 ) then redis.call('pexpire' , KEYS[1 ], ARGV[2 ]); return 0 ; else redis.call('del' , KEYS[1 ]); redis.call('publish' , KEYS[2 ], ARGV[1 ]); return 1 ; end ; return nil ;
2.2 锁重试和watch dog原理 (1) 原理解释 Redisson分布式锁的锁重试和watch dog的原理:
获取锁
首先尝试根据线程标识获取锁
判断锁的ttl是否为null(为null表示成功获取到锁,不为null表示获取锁失败)
获取锁成功,判断锁自动释放的时间是否未设置(-1)
未设置,开启watch dog,进行自动续约,默认值是30s
设置则使用设置的时间
获取锁失败,判断剩余等待时间是否大于0
大于0,订阅并等待释放锁的信号,并进行重试逻辑
小于等于0,则获取锁失败,返回
释放锁
首先尝试根据线程标识释放锁
释放锁成功,发送释放锁的消息,并关闭watch dog自动续约
释放锁失败,记录异常,返回
(2) 源码分析 ① 重试获取锁 boolean b = lock.tryLock(1 , TimeUnit.SECONDS);
org.redisson.RedissonLock
@Override public boolean tryLock (long waitTime, TimeUnit unit) throws InterruptedException { return tryLock(waitTime, -1 , unit); }
@Override public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(subscribeFuture, threadId); } }
private Long tryAcquire (long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }
private <T> RFuture<Long> tryAcquireAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1 ) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null ) { return ; } if (ttlRemaining == null ) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
<T> RFuture<T> tryLockInnerAsync (long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);" , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
② watch dog自动续约 定时更新有效期的方法是在获取锁成功之后调用的,方法名为scheduleExpirationRenewal
。
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap <>();public RedissonLock (CommandAsyncExecutor commandExecutor, String name) { ... this .id = commandExecutor.getConnectionManager().getId(); ... this .entryName = id + ":" + name; ... } protected String getEntryName () { return entryName; } private void scheduleExpirationRenewal (long threadId) { ExpirationEntry entry = new ExpirationEntry (); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null ) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } }
private void renewExpiration () { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null ) { return ; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask () { @Override public void run (Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null ) { return ; } Long threadId = ent.getFirstThreadId(); if (threadId == null ) { return ; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null ) { log.error("Can't update lock " + getName() + " expiration" , e); return ; } if (res) { renewExpiration(); } }); } }, internalLockLeaseTime / 3 , TimeUnit.MILLISECONDS); ee.setTimeout(task); }
protected RFuture<Boolean> renewExpirationAsync (long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;" , Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
那么什么时候停止定时更新呢?
当然是在释放锁的过程中,我们回过来看释放锁的剩余代码:
@Override public RFuture<Void> unlockAsync (long threadId) { RPromise<Void> result = new RedissonPromise <Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null ) { result.tryFailure(e); return ; } if (opStatus == null ) { IllegalMonitorStateException cause = new IllegalMonitorStateException ("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return ; } result.trySuccess(null ); }); return result; }
void cancelExpirationRenewal (Long threadId) { ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null ) { return ; } if (threadId != null ) { task.removeThreadId(threadId); } if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null ) { timeout.cancel(); } EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } }
这样,在锁被释放锁,所有的定时任务、订阅都被取消了。