Java 并发 Java并发之原子类型源码篇 小奥 2024-01-22 2024-01-22 Java并发之原子类型源码篇 一、Java原子类型 当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值,比如变量 i=1,A 线程更新 i+1,B 线程也更新 i+1,经过两个线程操作之后可能 i 不等于3,而是等于 2。
因为 A 和 B 线程在更新变量 i 的时候拿到的 i 都是 1,这就是线程不安全的更新操作,通常我们会使用 synchronized
来解决这个问题,synchronized
会保证多线程不会同时更新变量 i。
而 Java 从 JDK 1.5 开始提供了 java.util.concurrent.atomic
包 (以下简称 Atomic
包),这个包中的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式。
因为变量的类型有很多种,所以在 Atomic 包里一共提供了 12 个类,属于 4 种类型的原子更新方式,分别是原子更新基本类型 、原子更新数组 、原子更新引用 和原子更新属性 (字段)。Atomic 包里的类基本都是使用 Unsafe 实现的包装类 。
原子更新基本类型
AtomicBoolean
:原子更新布尔类型。
AtomicInteger
:原子更新整型。
AtomicLong
:原子更新长整型。
原子更新数组
AtomicIntegerArray
:原子更新整型数组里的元素。
AtomicLongArray
:原子更新长整型数组里的元素。
AtomicReferenceArray
:原子更新引用类型数组里的元素。
原子更新引用
AtomicReference
:原子更新引用类型。
AtomicReferenceFieldUpdater
:原子更新引用类型里的字段。
AtomicMarkableReference
:原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。
原子更新属性
AtomicIntegerFieldUpdater
:原子更新整型的字段的更新器。
AtomicLongFieldUpdater
:原子更新长整型字段的更新器。
AtomicStampedReference
:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
下面博主就在源码中为大家分析这些原子类的实现。
由于每一类中的实现都很类似,只要阅读熟悉一些基础类的源码,再去使用和阅读其他类型的原理就比较容易了。
由于篇幅问题,本篇只带领大家分析 AtomicInteger
原子更新整型 和 AtomicIntegerArray
原子更新整型数组里的元素。
1.1 AtomicInteger AtomicInteger
原子更新整型的原子类,可以保证整型的原子性。
① 常用方法 public final int get () public final int getAndSet (int newValue) public final int getAndIncrement () public final int getAndDecrement () public final int getAndAdd (int delta) boolean compareAndSet (int expect, int update) public final void lazySet (int newValue)
② 使用 public class Main { static AtomicInteger ai = new AtomicInteger (1 ); public static void main (String[] args) { System.out.println(ai.getAndIncrement()); System.out.println(ai.get()); } }
输出结果:
③ 实现原理 我们来看getAndIncrement()
方法部分的源码:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } private volatile int value; ... public final int getAndIncrement () { return unsafe.getAndAddInt(this , valueOffset, 1 ); }
可以看到,在AtomicInteger
的内部其实是通过调用Unsafe
的getAndAddInt()
方法来实现操作,这个函数是一个原子性操作,有三个参数:
this
:指AtomicInteger
实例的引用;
valueOffset
:是value变量在AtomicInteger
中的偏移值;
1
:要设置的第二个变量的值。
我们可以跟进去看一下getAndAddInt()
方法的源码:
public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
总结一下,其实AtomicInteger
内部的线程安全实现其实就是利用了 CAS + unsafe + volatile
来保证原子操作,从而避免使用synchronized
的高开销,使得执行效率有较大的提升。
1.2 AtomicIntegerArray AtomicIntegerArray
是更新整型数组元素的原子类。
① 常用方法 public final int get (int i) public final int getAndSet (int i, int newValue) public final int getAndIncrement (int i) public final int getAndDecrement (int i) public final int getAndAdd (int i, int delta) boolean compareAndSet (int i, int expect, int update) public final void lazySet (int i, int newValue)
从常用方法上可以看到,其实与AtomicInteger
还是非常相似的。
② 使用 public class Main { public static void main (String[] args) { int [] value = new int []{1 , 2 }; AtomicIntegerArray ai = new AtomicIntegerArray (value); ai.getAndSet(0 , 3 ); System.out.println(ai.get(0 )); System.out.println(value[0 ]); } }
输出结果:
③ 实现原理 我们来看getAndSet()
方法部分的源码:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final int base = unsafe.arrayBaseOffset(int [].class); private static final int shift; private final int [] array; static { int scale = unsafe.arrayIndexScale(int [].class); if ((scale & (scale - 1 )) != 0 ) throw new Error ("data type scale not a power of two" ); shift = 31 - Integer.numberOfLeadingZeros(scale); } private long checkedByteOffset (int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException ("index " + i); return byteOffset(i); } private static long byteOffset (int i) { return ((long ) i << shift) + base; } ... public final int getAndSet (int i, int newValue) { return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue); }
可以看到,在AtomicIntegerArray
的内部其实是通过调用Unsafe
的getAndSetInt()
方法来实现操作,这个函数是一个原子性操作,有三个参数:
array
:指AtomicIntegerArray
底层的数组;
checkedByteOffset(i)
:检查并返回给定索引的字节偏移;
newValue
:要设置的第二个变量的新值。
我们可以跟进去看一下getAndSetInt()
方法的源码:
public final int getAndSetInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var4)); return var5; }
可以看到这里的源码其实和AtomicInteger
上面的是一样的。
二、完整源码 以下源码版本为JDK8。
2.1 AtomicInteger package java.util.concurrent.atomic;import java.util.function.IntUnaryOperator;import java.util.function.IntBinaryOperator;import sun.misc.Unsafe;public class AtomicInteger extends Number implements java .io.Serializable { private static final long serialVersionUID = 6214790243416807050L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } private volatile int value; public AtomicInteger (int initialValue) { value = initialValue; } public AtomicInteger () { } public final int get () { return value; } public final void set (int newValue) { value = newValue; } public final void lazySet (int newValue) { unsafe.putOrderedInt(this , valueOffset, newValue); } public final int getAndSet (int newValue) { return unsafe.getAndSetInt(this , valueOffset, newValue); } public final boolean compareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); } public final boolean weakCompareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); } public final int getAndIncrement () { return unsafe.getAndAddInt(this , valueOffset, 1 ); } public final int getAndDecrement () { return unsafe.getAndAddInt(this , valueOffset, -1 ); } public final int getAndAdd (int delta) { return unsafe.getAndAddInt(this , valueOffset, delta); } public final int incrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, 1 ) + 1 ; } public final int decrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, -1 ) - 1 ; } public final int addAndGet (int delta) { return unsafe.getAndAddInt(this , valueOffset, delta) + delta; } public final int getAndUpdate (IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return prev; } public final int updateAndGet (IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next; } public final int getAndAccumulate (int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return prev; } public final int accumulateAndGet (int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return next; } public String toString () { return Integer.toString(get()); } public int intValue () { return get(); } public long longValue () { return (long )get(); } public float floatValue () { return (float )get(); } public double doubleValue () { return (double )get(); } }
2.2 AtomicIntegerArray package java.util.concurrent.atomic;import java.util.function.IntUnaryOperator;import java.util.function.IntBinaryOperator;import sun.misc.Unsafe;public class AtomicIntegerArray implements java .io.Serializable { private static final long serialVersionUID = 2862133569453604235L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final int base = unsafe.arrayBaseOffset(int [].class); private static final int shift; private final int [] array; static { int scale = unsafe.arrayIndexScale(int [].class); if ((scale & (scale - 1 )) != 0 ) throw new Error ("data type scale not a power of two" ); shift = 31 - Integer.numberOfLeadingZeros(scale); } private long checkedByteOffset (int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException ("index " + i); return byteOffset(i); } private static long byteOffset (int i) { return ((long ) i << shift) + base; } public AtomicIntegerArray (int length) { array = new int [length]; } public AtomicIntegerArray (int [] array) { this .array = array.clone(); } public final int length () { return array.length; } public final int get (int i) { return getRaw(checkedByteOffset(i)); } private int getRaw (long offset) { return unsafe.getIntVolatile(array, offset); } public final void set (int i, int newValue) { unsafe.putIntVolatile(array, checkedByteOffset(i), newValue); } public final void lazySet (int i, int newValue) { unsafe.putOrderedInt(array, checkedByteOffset(i), newValue); } public final int getAndSet (int i, int newValue) { return unsafe.getAndSetInt(array, checkedByteOffset(i), newValue); } public final boolean compareAndSet (int i, int expect, int update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } private boolean compareAndSetRaw (long offset, int expect, int update) { return unsafe.compareAndSwapInt(array, offset, expect, update); } public final boolean weakCompareAndSet (int i, int expect, int update) { return compareAndSet(i, expect, update); } public final int getAndIncrement (int i) { return getAndAdd(i, 1 ); } public final int getAndDecrement (int i) { return getAndAdd(i, -1 ); } public final int getAndAdd (int i, int delta) { return unsafe.getAndAddInt(array, checkedByteOffset(i), delta); } public final int incrementAndGet (int i) { return getAndAdd(i, 1 ) + 1 ; } public final int decrementAndGet (int i) { return getAndAdd(i, -1 ) - 1 ; } public final int addAndGet (int i, int delta) { return getAndAdd(i, delta) + delta; } public final int getAndUpdate (int i, IntUnaryOperator updateFunction) { long offset = checkedByteOffset(i); int prev, next; do { prev = getRaw(offset); next = updateFunction.applyAsInt(prev); } while (!compareAndSetRaw(offset, prev, next)); return prev; } public final int updateAndGet (int i, IntUnaryOperator updateFunction) { long offset = checkedByteOffset(i); int prev, next; do { prev = getRaw(offset); next = updateFunction.applyAsInt(prev); } while (!compareAndSetRaw(offset, prev, next)); return next; } public final int getAndAccumulate (int i, int x, IntBinaryOperator accumulatorFunction) { long offset = checkedByteOffset(i); int prev, next; do { prev = getRaw(offset); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSetRaw(offset, prev, next)); return prev; } public final int accumulateAndGet (int i, int x, IntBinaryOperator accumulatorFunction) { long offset = checkedByteOffset(i); int prev, next; do { prev = getRaw(offset); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSetRaw(offset, prev, next)); return next; } public String toString () { int iMax = array.length - 1 ; if (iMax == -1 ) return "[]" ; StringBuilder b = new StringBuilder (); b.append('[' ); for (int i = 0 ; ; i++) { b.append(getRaw(byteOffset(i))); if (i == iMax) return b.append(']' ).toString(); b.append(',' ).append(' ' ); } } }