Java并发之原子类型源码篇

Java并发之原子类型源码篇

一、Java原子类型

当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值,比如变量 i=1,A 线程更新 i+1,B 线程也更新 i+1,经过两个线程操作之后可能 i 不等于3,而是等于 2。

因为 A 和 B 线程在更新变量 i 的时候拿到的 i 都是 1,这就是线程不安全的更新操作,通常我们会使用 synchronized 来解决这个问题,synchronized 会保证多线程不会同时更新变量 i。

Java 从 JDK 1.5 开始提供了 java.util.concurrent.atomic(以下简称 Atomic 包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。

因为变量的类型有很多种,所以在 Atomic 包里一共提供了 12 个类,属于 4 种类型的原子更新方式,分别是原子更新基本类型原子更新数组原子更新引用原子更新属性(字段)。Atomic 包里的类基本都是使用 Unsafe 实现的包装类

原子更新基本类型

  • AtomicBoolean:原子更新布尔类型。
  • AtomicInteger:原子更新整型。
  • AtomicLong:原子更新长整型。

原子更新数组

  • AtomicIntegerArray:原子更新整型数组里的元素。
  • AtomicLongArray:原子更新长整型数组里的元素。
  • AtomicReferenceArray:原子更新引用类型数组里的元素。

原子更新引用

  • AtomicReference:原子更新引用类型。
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
  • AtomicMarkableReference:原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。

原子更新属性

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

下面博主就在源码中为大家分析这些原子类的实现。

由于每一类中的实现都很类似,只要阅读熟悉一些基础类的源码,再去使用和阅读其他类型的原理就比较容易了。

由于篇幅问题,本篇只带领大家分析 AtomicInteger 原子更新整型 和 AtomicIntegerArray原子更新整型数组里的元素。

1.1 AtomicInteger

AtomicInteger原子更新整型的原子类,可以保证整型的原子性。

① 常用方法

public final int get() 	// 获取当前值
public final int getAndSet(int newValue) // 自动设置为给定值并返回旧值。
public final int getAndIncrement() // 将当前值自动加1。
public final int getAndDecrement() // 将当前值自动加1。
public final int getAndAdd(int delta) // 自动将给定值添加到当前值。
boolean compareAndSet(int expect, int update) // 如果当前值==期望值,则自动将值设置为给定的更新值。
public final void lazySet(int newValue)// 最终设置为给定值。使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

② 使用

public class Main {
static AtomicInteger ai = new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(ai.getAndIncrement());
System.out.println(ai.get());
}
}

输出结果:

1
2

③ 实现原理

我们来看getAndIncrement()方法部分的源码:

   // 获取Unsafe实例,设置为使用Unsafe.compareAndSwapInt进行更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 存放变量value的偏移量
private static final long valueOffset;

static {
try {
// 获取value在AtomicInteger中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 实际变量值,被声明为volatile是为了保证多线程下的内存可见性
private volatile int value;

...

/**
* 将当前值自动加1。调用unsafe方法,原子性设置value值为原始值+1,返回值为原始值。
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

可以看到,在AtomicInteger的内部其实是通过调用UnsafegetAndAddInt()方法来实现操作,这个函数是一个原子性操作,有三个参数:

  • this:指AtomicInteger实例的引用;
  • valueOffset:是value变量在AtomicInteger中的偏移值;
  • 1:要设置的第二个变量的值。

我们可以跟进去看一下getAndAddInt()方法的源码:

   // sun.misc.Unsafe#getAndAddInt
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
// 每个线程先拿到变量的当前值,因为value是volatile变量,所以这里一定是最新值
var5 = this.getIntVolatile(var1, var2);
// 然后使用CAS修改变量的值,如果失败,则循环继续尝试,直至成功
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

总结一下,其实AtomicInteger内部的线程安全实现其实就是利用了 CAS + unsafe + volatile来保证原子操作,从而避免使用synchronized的高开销,使得执行效率有较大的提升。

1.2 AtomicIntegerArray

AtomicIntegerArray是更新整型数组元素的原子类。

① 常用方法

public final int get(int i) // 获取位置i的当前值。
public final int getAndSet(int i, int newValue) // 自动将位置i处的元素设置为给定值并返回旧值
public final int getAndIncrement(int i) // 对索引i处的元素自动加1。
public final int getAndDecrement(int i) // 对索引i处的元素自动减1。
public final int getAndAdd(int i, int delta) // 自动将给定值添加到索引i处的元素。
boolean compareAndSet(int i, int expect, int update) // 如果当前值==期望值,则自动将位置i处的元素设置为给定的更新值。
public final void lazySet(int i, int newValue)// 最终将第i位置的元素设置为给定值。使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

从常用方法上可以看到,其实与AtomicInteger还是非常相似的。

② 使用

public class Main {

public static void main(String[] args) {
int[] value = new int[]{1, 2};
AtomicIntegerArray ai = new AtomicIntegerArray(value);
ai.getAndSet(0, 3);
System.out.println(ai.get(0));
System.out.println(value[0]);
}
}

输出结果:

3
1

③ 实现原理

我们来看getAndSet()方法部分的源码:

// 用于低级别、高效内存访问的 Unsafe 类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 数组元素的基本偏移量(由 Unsafe 使用)
private static final int base = unsafe.arrayBaseOffset(int[].class);
// 将索引转换为字节偏移的位移值(由 Unsafe 使用)
private static final int shift;
// 底层的 int 数组
private final int[] array;

/**
* 根据数组索引比例初始化位移值
*/
static {
// scale为一个数组元素的大小,scale为二的幂
int scale = unsafe.arrayIndexScale(int[].class);
// 检查比例是否为二的幂
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 根据比例计算位移值
/*
比如scale为4,Integer.numberOfLeadingZeros(scale)表示scale前面有多少个0,
int在JVM中有4个字节表示,即由32位表示4的二进制为00000000000000000000000000000100,
Integer.numberOfLeadingZeros(scale)计算出来为29,即二进制的1前面有29个0,那么shift=31-29=2,
表示二进制的1在第2号位上, 2^2 = 4
*/
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

// 检查并返回给定索引的字节偏移
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
// 使用 byteOffset 方法返回字节偏移
return byteOffset(i);
}

// 计算给定索引的字节偏移
private static long byteOffset(int i) {
// 使用位移和基本值计算并返回字节偏移
return ((long) i << shift) + base;
}

...

/**
* 自动将位置i处的元素设置为给定值并返回旧值。
*/
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

可以看到,在AtomicIntegerArray的内部其实是通过调用UnsafegetAndSetInt()方法来实现操作,这个函数是一个原子性操作,有三个参数:

  • array:指AtomicIntegerArray底层的数组;
  • checkedByteOffset(i):检查并返回给定索引的字节偏移;
  • newValue:要设置的第二个变量的新值。

我们可以跟进去看一下getAndSetInt()方法的源码:

 // sun.misc.Unsafe#getAndSetInt
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
// 每个线程先拿到变量的当前值,因为value是volatile变量,所以这里一定是最新值
var5 = this.getIntVolatile(var1, var2);
// 然后使用CAS修改变量的值,如果失败,则循环继续尝试,直至成功
} while(!this.compareAndSwapInt(var1, var2, var5, var4));

return var5;
}

可以看到这里的源码其实和AtomicInteger上面的是一样的。

二、完整源码

以下源码版本为JDK8。

2.1 AtomicInteger

package java.util.concurrent.atomic;
import java.util.function.IntUnaryOperator;
import java.util.function.IntBinaryOperator;
import sun.misc.Unsafe;

/**
* 可以自动更新的int值。有关原子变量属性的描述,请参阅atomic包规范。
* AtomicInteger用于诸如自动递增计数器之类的应用程序,不能用作Integer的替代品。
* 但是,这个类扩展了Number,以允许处理基于数字的类的工具和实用程序进行统一访问。
*/
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;

// 获取Unsafe实例,设置为使用Unsafe.compareAndSwapInt进行更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 存放变量value的偏移量
private static final long valueOffset;

static {
try {
// 获取value在AtomicInteger中的偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

// 实际变量值,被声明为volatile是为了保证多线程下的内存可见性
private volatile int value;

/**
* 用给定的初始值创建一个新的AtomicInteger。
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}

/**
* 创建一个初始值为0的新AtomicInteger。
*/
public AtomicInteger() {
}

/**
* 获取当前值
*/
public final int get() {
return value;
}

/**
* 设置为给定的值。
*/
public final void set(int newValue) {
value = newValue;
}

/**
* 最终设置为给定值。
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}

/**
* 自动设置为给定值并返回旧值。
*/
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}

/**
* 如果当前值==期望值,则自动将值设置为给定的更新值。
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
* 如果当前值==期望值,则自动将值设置为给定的更新值。
* 可能会错误地失败,并且不提供排序保证,因此很少作为compareAndSet的合适替代。
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
* 将当前值自动加1。
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

/**
* 将当前值自动减1。
*/
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}

/**
* 自动将给定值添加到当前值。
*/
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}

/**
* 将当前值自动加1。
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

/**
* 将当前值自动减1。
*/
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}

/**
* 自动将给定值添加到当前值。
*/
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

/**
* 使用应用给定函数的结果自动更新当前值,返回前一个值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
*/
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
}

/**
* 使用应用给定函数的结果自动更新当前值,并返回更新后的值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
*/
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}

/**
* 使用对当前值和给定值应用给定函数的结果自动更新当前值,并返回前一个值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
* 应用该函数时,当前值作为第一个参数,给定的更新作为第二个参数。
*/
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

/**
* 使用将给定函数应用于当前值和给定值的结果自动更新当前值,并返回更新后的值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
* 应用该函数时,当前值作为第一个参数,给定的更新作为第二个参数。
*/
public final int accumulateAndGet(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
}

/**
* 返回当前值的字符串表示形式。
*/
public String toString() {
return Integer.toString(get());
}

/**
* 将AtomicInteger的值返回为int类型。
*/
public int intValue() {
return get();
}

/**
* 在进行扩展原语转换后,返回此AtomicInteger的值为long类型。
*/
public long longValue() {
return (long)get();
}

/**
* 在进行扩展原语转换后,返回此AtomicInteger的值为float类型。
*/
public float floatValue() {
return (float)get();
}

/**
* 在进行扩展原语转换后,返回此AtomicInteger的值为double类型。
*/
public double doubleValue() {
return (double)get();
}

}

2.2 AtomicIntegerArray

package java.util.concurrent.atomic;
import java.util.function.IntUnaryOperator;
import java.util.function.IntBinaryOperator;
import sun.misc.Unsafe;

/**
* 一个int数组,其中的元素可以自动更新。有关原子变量属性的描述,请参阅atomic包规范。
*/
public class AtomicIntegerArray implements java.io.Serializable {
private static final long serialVersionUID = 2862133569453604235L;

// 用于低级别、高效内存访问的 Unsafe 类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 数组元素的基本偏移量(由 Unsafe 使用)
private static final int base = unsafe.arrayBaseOffset(int[].class);
// 将索引转换为字节偏移的位移值(由 Unsafe 使用)
private static final int shift;
// 底层的 int 数组
private final int[] array;

/**
* 根据数组索引比例初始化位移值
*/
static {
// scale为一个数组元素的大小,scale为二的幂
int scale = unsafe.arrayIndexScale(int[].class);
// 检查比例是否为二的幂
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 根据比例计算位移值
/*
比如scale为4,Integer.numberOfLeadingZeros(scale)表示scale前面有多少个0,
int在JVM中有4个字节表示,即由32位表示4的二进制为00000000000000000000000000000100,
Integer.numberOfLeadingZeros(scale)计算出来为29,即二进制的1前面有29个0,那么shift=31-29=2,
表示二进制的1在第2号位上, 2^2 = 4
*/
shift = 31 - Integer.numberOfLeadingZeros(scale);
}

// 检查并返回给定索引的字节偏移
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
// 使用 byteOffset 方法返回字节偏移
return byteOffset(i);
}

// 计算给定索引的字节偏移
private static long byteOffset(int i) {
// 使用位移和基本值计算并返回字节偏移
return ((long) i << shift) + base;
}

/**
* 创建给定长度的新AtomicIntegerArray,初始所有元素为零。
*/
public AtomicIntegerArray(int length) {
array = new int[length];
}

/**
* 创建一个新的AtomicIntegerArray,其长度与给定数组相同,并从给定数组复制所有元素。
*/
public AtomicIntegerArray(int[] array) {
// 最终字段保证的可见性
this.array = array.clone();
}

/**
* 返回数组的长度。
*/
public final int length() {
return array.length;
}

/**
* 获取位置i的当前值。
*/
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}

private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}

/**
* 将位置i处的元素设置为给定值。
*/
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}

/**
* 最终将第i位置的元素设置为给定值。
*/
public final void lazySet(int i, int newValue) {
unsafe.putOrderedInt(array, checkedByteOffset(i), newValue);
}

/**
* 自动将位置i处的元素设置为给定值并返回旧值。
*/
public final int getAndSet(int i, int newValue) {
return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue);
}

/**
* 如果当前值==期望值,则自动将位置i处的元素设置为给定的更新值。
*/
public final boolean compareAndSet(int i, int expect, int update) {
return compareAndSetRaw(checkedByteOffset(i), expect, update);
}

private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
}

/**
* 如果当前值==期望值,则自动将位置i处的元素设置为给定的更新值。
* 可能会错误地失败,并且不提供排序保证,因此很少作为compareAndSet的合适替代。
*/
public final boolean weakCompareAndSet(int i, int expect, int update) {
return compareAndSet(i, expect, update);
}

/**
* 对索引i处的元素自动加1。
*/
public final int getAndIncrement(int i) {
return getAndAdd(i, 1);
}

/**
* 对索引i处的元素自动减1。
*/
public final int getAndDecrement(int i) {
return getAndAdd(i, -1);
}

/**
* 自动将给定值添加到索引i处的元素。
*/
public final int getAndAdd(int i, int delta) {
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

/**
* 对索引i处的元素自动加1。
*/
public final int incrementAndGet(int i) {
return getAndAdd(i, 1) + 1;
}

/**
* 自动地将索引i处的元素减1。
*/
public final int decrementAndGet(int i) {
return getAndAdd(i, -1) - 1;
}

/**
* 自动将给定值添加到索引i处的元素。
*/
public final int addAndGet(int i, int delta) {
return getAndAdd(i, delta) + delta;
}


/**
* 使用应用给定函数的结果自动更新索引i处的元素,返回前一个值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
*/
public final int getAndUpdate(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

/**
* 使用应用给定函数的结果自动更新索引i处的元素,并返回更新后的值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
*/
public final int updateAndGet(int i, IntUnaryOperator updateFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = updateFunction.applyAsInt(prev);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

/**
* 使用对当前值和给定值应用给定函数的结果自动更新索引i处的元素,并返回前一个值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
* 该函数应用索引i处的当前值作为第一个参数,并将给定的更新作为第二个参数。
*/
public final int getAndAccumulate(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return prev;
}

/**
* 使用对当前值和给定值应用给定函数的结果自动更新索引i处的元素,并返回更新后的值。
* 该函数应该没有副作用,因为当尝试更新由于线程之间的争用而失败时,可以重新应用它。
* 该函数应用索引i处的当前值作为第一个参数,并将给定的更新作为第二个参数。
*/
public final int accumulateAndGet(int i, int x,
IntBinaryOperator accumulatorFunction) {
long offset = checkedByteOffset(i);
int prev, next;
do {
prev = getRaw(offset);
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSetRaw(offset, prev, next));
return next;
}

/**
* 返回数组当前值的字符串表示形式。
*/
public String toString() {
int iMax = array.length - 1;
if (iMax == -1)
return "[]";

StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; ; i++) {
b.append(getRaw(byteOffset(i)));
if (i == iMax)
return b.append(']').toString();
b.append(',').append(' ');
}
}

}