Java并发之ReentrantLock源码篇

Java并发之ReentrantLock源码篇

一、ReentrantLock

ReentrantLock 是一个可重入且独占式的锁,和 synchronized 关键字类似。不过,ReentrantLock 更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。

1. 继承关系

ReentrantLock实现了Lock接口,Lock接口中定义了lockunlock相关操作,并且还存在newCondition方法,表示生成一个条件。

public class ReentrantLock implements Lock, java.io.Serializable { ... }

Lock接口主要方法如下:

image-20240128204509714

2. 内部类

ReentrantLock总共有三个内部类,并且三个内部类是紧密相关的。

  • Sync,继承自AbstractQueueSynchronizer抽象类,
  • FairSync,继承自Sync
  • NonfairSync,继承自Sync

(1) Sync

通过下面的源码可以看到,Sync其实就是重写了AbstractQueuedSynchronizer中的方法,其实本质上还是AQS的逻辑。

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* 获取锁。执行Lock.lock()。子类化的主要原因是为非公平版本提供快速路径。
*/
abstract void lock();

/**
* 执行非公平tryLock。tryAcquire在子类中实现,但都需要对trylock方法进行非公平尝试。
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前状态
int c = getState();
if (c == 0) { // 表示没有线程竞争该锁
// 比较并设置状态成功,状态0表示锁没有被占用
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占,返回true,获取锁成功
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 当前线程拥有该锁
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置状态,返回true,获取锁成功
setState(nextc);
return true;
}
// 获取锁失败,返回false
return false;
}
/**
* 尝试获取对象状态
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程不为独占线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放标识
boolean free = false;
if (c == 0) {
free = true;
// 已经释放,清空独占
setExclusiveOwnerThread(null);
}
// 设置标识
setState(c);
return free;
}

/**
* 判断资源是否被当前线程占用
*/
protected final boolean isHeldExclusively() {
// 通常我们必须在owner之前读取state,
// 我们不需要检查当前线程是否为owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// 方法从外部类中继

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* 从流重新构造实例(即反序列化它)。
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // 复位至解锁状态
}
}

(2) NonfairSync

NonfairSync类继承了Sync类,表示采用非公平策略获取锁。从下面的源码可以看出:

每次lock时都会尝试获取锁,而不是按照公平等待的原则进行等待,让等待时间最久的线程获得锁。

/**
* 非公平锁的同步对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* 执行锁。尝试立即驳船,失败时备份到正常获取。
*/
final void lock() {
if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
// 把当前线程设置独占锁
setExclusiveOwnerThread(Thread.currentThread());
else // 表示锁已经被占用
// 以独占方式获取对象,忽略中断
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

(3) FairSync

FairSync类也继承了Sync类,表示采用公平策略获取锁。从下面的源码可知:

当资源空闲时,它总是会先判断sync队列(即AbstractQueuedSynchronizer中的数据结构)是否有等待时间更长的线程,如果存在,则将该线程加入到等待队列的尾部,实现了公平获取原则。

/**
* 为公平锁同步对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1); // 以独占方式获取对象,忽略中断
}

/**
* 公平版本的tryAcquire。除非递归调用或没有等待者,否则不要授予访问权限。
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且CAS设置状态成功
// 设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占用
// 下一个状态
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置状态
return true;
}
return false;
}
}

3. 构造函数

默认的构造函数采用的是非公平策略获取锁,另外还可以通过传递参数来决定采用公平策略或者是非公平策略,参数为true表示公平策略,参数为false则采用非公平策略。

/**
* 创建ReentrantLock的实例。这相当于使用ReentrantLock(false)。
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* 使用给定的公平性策略创建ReentrantLock的实例。
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

4. 核心逻辑

通过分析ReentrantLock的源码,可知对其操作都转化为对Sync对象的操作,由于Sync继承了AQS,所以基本上都可以转化为对AQS的操作。如将ReentrantLock的lock函数转化为对Sync的lock函数的调用,而具体会根据采用的策略(如公平策略或者非公平策略)的不同而调用到Sync的不同子类。

二、完整源码

以下源码版本为JDK8。

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;

/**
* 具有与使用同步方法和语句访问的隐式监视锁相同的基本行为和语义的可重入互斥锁,但具有扩展的功能。
*
* ReentrantLock属于上一次成功锁定但尚未解锁的线程。
* 当锁不属于其他线程时,调用锁的线程将返回并成功获取锁。如果当前线程已经拥有该锁,该方法将立即返回。
* 这可以使用isHeldByCurrentThread和getHoldCount方法来检查。
*
* 该类的构造函数接受一个可选的公平性参数。当设置为true时,在争用情况下,锁倾向于授予对等待时间最长的线程的访问权。
* 否则,此锁不能保证任何特定的访问顺序。使用由多个线程访问的公平锁的程序可能会显示较低的总体吞吐量
* (即速度较慢,通常比使用默认设置的慢得多),但是在获得锁和保证没有饥饿的时间上的差异较小。
* 但是请注意,锁的公平性并不能保证线程调度的公平性。因此,使用公平锁的多个线程中的一个可以连续多次获得公平锁,
* 而其他活动线程没有进展,也没有当前持有该锁。还要注意,不定时的tryLock()方法不遵守公平性设置。
* 如果锁可用,即使其他线程正在等待,它也会成功。
*
* 建议在调用lock之后立即使用try块,最典型的是在before/after构造中,例如:
*
* <pre> {@code
* class X {
* private final ReentrantLock lock = new ReentrantLock();
* // ...
*
* public void m() {
* lock.lock(); // block until condition holds
* try {
* // ... method body
* } finally {
* lock.unlock()
* }
* }
* }}</pre>
*
* 除了实现Lock接口之外,该类还定义了许多公共和受保护的方法,用于检查锁的状态。其中一些方法仅对仪器和监视有用。
*
* 该类的序列化与内置锁的行为方式相同:反序列化的锁处于解锁状态,无论其序列化时的状态如何。
*
* 该锁最多支持同一个线程的2147483647个递归锁。尝试超过此限制将导致锁定方法抛出错误。
*
*/
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** 提供所有实现机制的同步器 */
private final Sync sync;

/**
* 此锁的同步控制基础。下面分为公平版和非公平版。使用AQS状态表示锁上的持有数。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

/**
* 获取锁。执行Lock.lock()。子类化的主要原因是为非公平版本提供快速路径。
*/
abstract void lock();

/**
* 执行非公平tryLock。tryAcquire在子类中实现,但都需要对trylock方法进行非公平尝试。
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前状态
int c = getState();
if (c == 0) { // 表示没有线程竞争该锁
// 比较并设置状态成功,状态0表示锁没有被占用
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占,返回true,获取锁成功
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 当前线程拥有该锁
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置状态,返回true,获取锁成功
setState(nextc);
return true;
}
// 获取锁失败,返回false
return false;
}
/**
* 尝试获取对象状态
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程不为独占线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放标识
boolean free = false;
if (c == 0) {
free = true;
// 已经释放,清空独占
setExclusiveOwnerThread(null);
}
// 设置标识
setState(c);
return free;
}

/**
* 判断资源是否被当前线程占用
*/
protected final boolean isHeldExclusively() {
// 通常我们必须在owner之前读取state,
// 我们不需要检查当前线程是否为owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

// 方法从外部类中继

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

/**
* 从流重新构造实例(即反序列化它)。
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // 复位至解锁状态
}
}

/**
* 非公平锁的同步对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* 执行锁。尝试立即驳船,失败时备份到正常获取。
*/
final void lock() {
if (compareAndSetState(0, 1)) // CAS设置状态成功,状态0表示锁没有被占用
// 把当前线程设置独占锁
setExclusiveOwnerThread(Thread.currentThread());
else // 表示锁已经被占用
// 以独占方式获取对象,忽略中断
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* 为公平锁同步对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1); // 以独占方式获取对象,忽略中断
}

/**
* 公平版本的tryAcquire。除非递归调用或没有等待者,否则不要授予访问权限。
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且CAS设置状态成功
// 设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占用
// 下一个状态
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 设置状态
return true;
}
return false;
}
}

/**
* 创建ReentrantLock的实例。这相当于使用ReentrantLock(false)。
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* 使用给定的公平性策略创建ReentrantLock的实例。
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

/**
* 获取锁。
* 如果锁未被其他线程持有,则获取该锁并立即返回,并将锁持有计数设置为1。
* 如果当前线程已经持有锁,则持有计数增加1,方法立即返回。
* 如果锁由另一个线程持有,那么当前线程将因线程调度目的而禁用,并处于休眠状态,直到获得锁,此时锁持有计数被设置为1。
*/
public void lock() {
sync.lock();
}

/**
* 除非当前线程被中断,否则获取锁。
* 如果锁未被其他线程持有,则获取该锁并立即返回,并将锁持有计数设置为1。
* 如果当前线程已经持有此锁,则持有计数增加1,并且该方法立即返回。
* 如果锁由另一个线程持有,那么当前线程将因线程调度目的而被禁用,并处于休眠状态,直到发生以下两种情况之一:
* - 锁由当前线程获取;
* - 或其他线程中断当前线程。
* 如果锁是由当前线程获取的,则锁持有计数设置为1。
* 如果当前线程:
* -在进入此方法时设置其中断状态;
* - 或在获取锁时被中断,
* 然后InterruptedException被抛出,当前线程的中断状态被清除。
* 在这个实现中,由于该方法是一个显式中断点,因此优先考虑响应中断,而不是正常或可重入获取锁。
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

/**
* 仅在调用时未被其他线程持有时获取锁。
* 如果锁未被其他线程持有,则获取该锁,并立即返回值true,将锁持有计数设置为1。即使将此锁设置为使用公平的排序策略,
* 如果锁可用,调用tryLock()将立即获取该锁,无论当前是否有其他线程正在等待该锁。
* 这种“闯入”行为在某些情况下是有用的,即使它破坏了公平。
* 如果您希望遵守此锁的公平性设置,那么使用tryLock(0, TimeUnit.SECONDS),这几乎是等效的(它也检测中断)。
* 如果当前线程已经持有此锁,则持有计数增加1,该方法返回true。
* 如果锁由另一个线程持有,则此方法将立即返回值false。
*/
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}

/**
* 如果在给定的等待时间内没有被其他线程持有且当前线程未被中断,则获取该锁。
* 如果锁未被其他线程持有,则获取该锁,并立即返回值true,将锁持有计数设置为1。
* 如果将此锁设置为使用公平排序策略,那么如果有任何其他线程正在等待该锁,则不会获得可用的锁。
* 这与tryLock()方法相反。如果你想要一个允许闯入公平锁的定时tryLock,那么将定时和非定时形式组合在一起:
* if (lock.tryLock() || lock.tryLock(timeout, unit)) { ... }
* 如果当前线程已经持有此锁,则持有计数增加1,该方法返回true。
* 如果锁由另一个线程持有,那么当前线程将因线程调度目的而被禁用,并处于休眠状态,直到发生以下三种情况之一:
* - 锁由当前线程获取;
* - 或其他线程中断当前线程;
* - 或指定的等待时间已过。
* 如果获得了锁,则返回值true并将锁持有计数设置为1。
* 如果当前线程:
* - 在进入此方法时设置其中断状态;
* - 或在获取锁时被中断,
* 然后InterruptedException被抛出,当前线程的中断状态被清除。
* 如果超过了指定的等待时间,则返回值false。如果时间小于或等于零,则该方法根本不会等待。
* 在这个实现中,由于该方法是一个显式中断点,因此优先考虑响应中断,而不是正常或可重入的锁获取,也不是报告等待时间的流逝。
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

/**
* 试图释放此锁。
* 如果当前线程是此锁的持有者,则持有计数递减。如果保持计数现在为零,则释放锁。
* 如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。
*/
public void unlock() {
sync.release(1);
}

/**
* 返回用于此锁实例的条件实例。
* 当与内置监控器锁一起使用时,返回的Condition实例支持与Object监控器方法(wait、notify和notifyAll)相同的用法。
* - 如果在调用任何条件等待或信令方法时未持有此锁,则抛出IllegalMonitorStateException。
* - 当调用条件等待方法时,锁被释放,在它们返回之前,锁被重新获取,锁持有计数恢复到调用方法时的状态。
* - 如果线程在等待时被中断,那么等待将终止,InterruptedException将被抛出,线程的中断状态将被清除。
* - 等待线程按FIFO顺序发出信号。
* - 从等待方法返回的线程的锁重获取顺序与最初获取锁的线程相同,默认情况下没有指定,但对于公平锁,优先考虑等待时间最长的线程。
*/
public Condition newCondition() {
return sync.newCondition();
}

/**
* 查询当前线程持有该锁的次数。
* 线程对每个锁动作都有一个锁的保持,而这个锁动作没有与解锁动作相匹配。
* 保持计数信息通常仅用于测试和调试目的。例如,如果某个代码段不应该在锁已经持有的情况下进入,那么我们可以断言这个事实:
*
* <pre> {@code
* class X {
* ReentrantLock lock = new ReentrantLock();
* // ...
* public void m() {
* assert lock.getHoldCount() == 0;
* lock.lock();
* try {
* // ... method body
* } finally {
* lock.unlock();
* }
* }
* }}</pre>
*/
public int getHoldCount() {
return sync.getHoldCount();
}

/**
* 查询当前线程是否持有此锁。
* 类似于用于内置监视器锁的Thread.holdsLock(Object)方法,此方法通常用于调试和测试。
* 例如,一个应该只在持有锁时调用的方法可以断言是这种情况:
*
* <pre> {@code
* class X {
* ReentrantLock lock = new ReentrantLock();
* // ...
*
* public void m() {
* assert lock.isHeldByCurrentThread();
* // ... method body
* }
* }}</pre>
*
* 它还可以用于确保以不可重入的方式使用可重入锁,例如:
*
* <pre> {@code
* class X {
* ReentrantLock lock = new ReentrantLock();
* // ...
*
* public void m() {
* assert !lock.isHeldByCurrentThread();
* lock.lock();
* try {
* // ... method body
* } finally {
* lock.unlock();
* }
* }
* }}</pre>
*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

/**
* 查询该锁是否被任何线程持有。这种方法设计用于监视系统状态,而不是用于同步控制。
*/
public boolean isLocked() {
return sync.isLocked();
}

/**
* 如果此锁的公平性设置为真,则返回真。
*/
public final boolean isFair() {
return sync instanceof FairSync;
}

/**
* 返回当前拥有此锁的线程,如果不拥有则返回null。
* 当非所有者的线程调用此方法时,返回值反映当前锁状态的最佳近似值。
* 例如,即使有线程试图获取锁但尚未这样做,所有者也可能暂时为空。
* 这种方法的目的是为了方便子类的构建,从而提供更广泛的锁监视功能。
*/
protected Thread getOwner() {
return sync.getOwner();
}

/**
* 查询是否有线程正在等待获取此锁。
* 请注意,由于取消可能随时发生,因此true返回并不能保证任何其他线程都将获得此锁。
* 这种方法主要是为监视系统状态而设计的。
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

/**
* 查询给定线程是否正在等待获取此锁。、
* 注意,因为取消可能在任何时候发生,一个真实的返回并不能保证这个线程会获得这个锁。
* 这种方法主要是为监视系统状态而设计的。
*/
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}

/**
* 返回等待获取此锁的线程数的估计值。
* 该值只是一个估计值,因为当此方法遍历内部数据结构时,线程的数量可能会动态变化。
* 这种方法设计用于监视系统状态,而不是用于同步控制。
*/
public final int getQueueLength() {
return sync.getQueueLength();
}

/**
* 返回一个集合,其中包含可能正在等待获取此锁的线程。
* 由于在构造此结果时,实际的线程集可能会动态变化,因此返回的集合只是尽力而为的估计值。
* 返回集合的元素没有特定的顺序。该方法旨在促进子类的构建,从而提供更广泛的监视功能。
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

/**
* 查询是否有线程正在等待与此锁关联的给定条件。
* 请注意,由于超时和中断可能随时发生,因此真正的返回并不能保证将来的信号将唤醒任何线程。
* 这种方法主要是为监视系统状态而设计的。
*/
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}

/**
* 返回与此锁关联的给定条件上等待的线程数的估计值。
* 请注意,由于超时和中断可能随时发生,因此估计仅作为实际等待者数量的上界。
* 这种方法设计用于监视系统状态,而不是用于同步控制。
*/
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}

/**
* 返回一个集合,其中包含可能正在等待与此锁关联的给定条件的线程。
* 由于在构造此结果时,实际的线程集可能会动态变化,因此返回的集合只是尽力而为的估计值。
* 返回集合的元素没有特定的顺序。这种方法的设计是为了方便子类的建设,提供更广泛的状态监测设施。
*/
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}

/**
* 返回标识此锁的字符串及其锁状态。
* 括号中的状态包括字符串“Unlocked”或字符串“Locked by”,后面跟着所属线程的名称。
*/
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}