目录
一、什么是CAS
二、从一个案例引出CAS
三、Java中的Atomic 原子操作包
1. 基本原子类
2. 数组原子类
3. 引用原子类
4. 字段更新原子类
五、类 AtomicInteger
1、常用的方法:
AtomicInteger 案例:
2、AtomicInteger 源码解析:
六、Unsafe类
1、Unsafe 提供的 CAS 方法
2、获取属性偏移量
3、根据属性的偏移量获取属性的最新值:
七、CAS的缺点
八、以空间换时间:LongAdder
1、LongAdder 的原理
2、longAddr内部结构
4、LongAdder 类的 add 方法
5、LongAdder 类的 casCellsBusy 方法
九、使用 AtomicStampedReference 解决 ABA 问题
1、AtomicStampReference 的构造器:
2、AtomicStampReference 的常用的几个方法如下:
3、案例
一、什么是CAS
CAS,compare and swap的缩写,中文翻译成比较并交换。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。
二、从一个案例引出CAS
案例:
public class Test {
    public static int count = 0;
    private final static int MAX_TREAD=10;
    public static AtomicInteger atomicInteger = new AtomicInteger(0);
    public static void main(String[] args) throws InterruptedException {
        /*CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。
        使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。
        当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。*/
        CountDownLatch latch = new CountDownLatch(MAX_TREAD);
        //匿名内部类
        Runnable runnable =  new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    count++;
                    atomicInteger.getAndIncrement();
                }
                latch.countDown(); // 当前线程调用此方法,则计数减一
            }
        };
        //同时启动多个线程
        for (int i = 0; i < MAX_TREAD; i++) {
            new Thread(runnable).start();
        }
        latch.await(); // 阻塞当前线程,直到计数器的值为0
        System.out.println("理论结果:" + 1000 * MAX_TREAD);
        System.out.println("static count: " + count);
        System.out.println("AtomicInteger: " + atomicInteger.intValue());
    }
}
输出:
理论结果:10000
static count: 9994
AtomicInteger: 10000
我们发现每次运行,atomicInteger 的结果值都是正确的,count++的结果却不对,下面我们就开始探究原因。
三、Java中的Atomic 原子操作包
并发包中原子类 , 都存放在 java.util.concurrent.atomic 类路径下:
- 基本原子类
 - 数组原子类
 - 原子引用类型
 - 字段更新原子类
 
1. 基本原子类
- AtomicInteger:整型原子类。
 - AtomicLong:长整型原子类。
 - AtomicBoolean :布尔型原子类。
 
2. 数组原子类
- AtomicIntegerArray:整型数组原子类。
 - AtomicLongArray:长整型数组原子类。
 - AtomicReferenceArray :引用类型数组原子类。
 
3. 引用原子类
- AtomicReference:引用类型原子类。
 - AtomicMarkableReference :带有更新标记位的原子引用类型。
 - AtomicStampedReference :带有更新版本号的原子引用类型。
 
4. 字段更新原子类
- AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
 - AtomicLongFieldUpdater:原子更新长整型字段的更新器。
 - AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
 
五、类 AtomicInteger
1、常用的方法:
| 方法 | 介绍 | 
|---|---|
| public final int get() | 获取当前的值 | 
| public final int getAndSet(int newValue) | 获取当前的值,然后设置新的值 | 
| public final int getAndIncrement() | 获取当前的值,然后自增 | 
| public final int getAndDecrement() | 获取当前的值,然后自减 | 
| public final int getAndAdd(int delta) | 获取当前的值,并加上预期的值 | 
| boolean compareAndSet(int expect, int update) | 通过 CAS 方式设置整数值 | 
AtomicInteger 案例:
 private static void out(int oldValue,int newValue){
        System.out.println("旧值:"+oldValue+",新值:"+newValue);
    }
    public static void main(String[] args) {
        int value = 0;
        AtomicInteger atomicInteger= new AtomicInteger(0);
        //取值,然后设置一个新值
        value = atomicInteger.getAndSet(3);
        //旧值:0,新值:3
        out(value,atomicInteger.get());
        //取值,然后自增
        value = atomicInteger.getAndIncrement();
        //旧值:3,新值:4
        out(value,atomicInteger.get());
        //取值,然后增加 5
        value = atomicInteger.getAndAdd(5);
        //旧值:4,新值:9
        out(value,atomicInteger.get());
        //CAS 交换
        boolean flag = atomicInteger.compareAndSet(9, 100);
        //旧值:4,新值:100
        out(value,atomicInteger.get());
    }
2、AtomicInteger 源码解析:
public class AtomicInteger extends Number implements java.io.Serializable {
    // 设置使用Unsafe.compareAndSwapInt进行更新
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                    (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }
    ...省略
    private volatile int value;
    //自动设置为给定值并返回旧值。
    public final int getAndSet(int newValue) {
        return unsafe.getAndSetInt(this, valueOffset, newValue);
    }
    //以原子方式将当前值加1并返回旧值。
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
    //以原子方式将当前值减1并返回旧值。
    public final int getAndDecrement() {
        return unsafe.getAndAddInt(this, valueOffset, -1);
    }
    //原子地将给定值添加到当前值并返回旧值。
    public final int getAndAdd(int delta) {
        return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    ...省略
}
通过源码我们发现AtomicInteger的增减操作都调用了Unsafe 实例的方法,下面我们对Unsafe类做介绍:
六、Unsafe类
是位于
sun.misc 包下的一个类,Unsafe 提供了CAS 方法,直接通过native 方式(封装 C++代码)调用了底层的 CPU 指令 cmpxchg。
sun.misc.Unsafe,从名字中我们可以看出来这个类对普通程序员来说是“危险”的,一般应用开发者不会用到这个类。
 1、Unsafe 提供的 CAS 方法
主要如下: 定义在 Unsafe 类中的三个 “比较并交换”原子方法
/*
@param o 包含要修改的字段的对象
@param offset 字段在对象内的偏移量
@param expected 期望值(旧的值)
@param update 更新值(新的值)
@return true 更新成功 | false 更新失败
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt( Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong( Object o, long offset, long expected, long update);
提供的
CAS
方法包含四个入参:
包含要修改的字段对象、字段内存位置、预期原值及
Unsafe
的
CAS
方法的时候,这些方法首先将内存位置的值与预期值(旧的值)比
true
;如果不相匹配,
false
。
2、获取属性偏移量
Unsafe 提供的获取字段(属性)偏移量的相关操作,主要如下:
/**
* @param o 需要操作属性的反射 
* @return 属性的偏移量 
*/ 
public native long staticFieldOffset(Field field); 
public native long objectFieldOffset(Field field);
方法用于获取静态属性
Field
在
Class
对象中的偏移量,在
CAS
操作静态属性时,会用到这个偏移量。
方法用于获取非静态
Field
(非静态属性)在
Object 实例中的偏移量,在 CAS
操作对象的非静态属性时,会用到这个偏移量。
3、根据属性的偏移量获取属性的最新值:
/**
* @param o 字段所属于的对象实例
* @param fieldOffset 字段的偏移量 
* @return 字段的最新值
*/
public native int getIntVolatile(Object o, long fieldOffset);
七、CAS的缺点
1. ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。
JDK 提供了两个类 AtomicStampedReference、AtomicMarkableReference 来解决 ABA 问题。
2. 只能保证一个共享变量的原子操作。一个比较简单的规避方法为:把多个共享变量合并成一个共享变量来操作。 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,可以把多个变量放在一个 AtomicReference 实例后再进行 CAS 操作。比如有两个共享变量 i=1、j=2,可以将二者合并成一个对象,然后用 CAS 来操作该合并对象的 AtomicReference 引用。
3. 循环时间长开销大。高并发下N多线程同时去操作一个变量,会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性。
CAS
恶性空自旋的较为常见的方案为:
- 分散操作热点,使用 LongAdder 替代基础原子类 AtomicLong。
 - 使用队列削峰,将发生 CAS 争用的线程加入一个队列中排队,降低 CAS 争用的激烈程度。JUC 中非常重要的基础类 AQS(抽象队列同步器)就是这么做的。
 
八、以空间换时间:LongAdder
1、LongAdder 的原理
如果有竞争的话,内部维护了多个Cell变量,每个Cell里面有一个初始值为0的long型变量,
不同线程会命中到数组的不同Cell
(槽
)中,各个线程只对自己Cell(槽)
中的那个值进行 CAS 操作。这样热点就被分散了,冲突的概率就小很多。
LongAdder 存储的值,只要将各个槽中的变量值累加,后的值即可。
 2、longAddr内部结构

Striped64类的重要成员属性:
    /**
     * cell表,当非空时,大小是2的幂。  
     */
    transient volatile Cell[] cells;
    /**
     * 基础值,主要在没有争用时使用
     * 在没有争用时使用CAS更新这个值
     */
    transient volatile long base;
    /**
     * 自旋锁(通过CAS锁定) 在调整大小和/或创建cell时使用,
     * 为 0 表示 cells 数组没有处于创建、扩容阶段,反之为1
     */
    transient volatile int cellsBusy;
内部包含一个
base
和一个
Cell[]
类型的
cells
数组
。 在没有竞争的情况下,要累加的数通过 CAS
累加到
base
上;如果有竞争的话,会将要累加的数累加到 Cells
数组中的某个
cell
元素里面。所以
Striped64
的整体值
value
为
base+
∑
[0~n]cells
。
的获取源码如下:
public long longValue() {
    return sum();
}
public long sum() {
    Striped64.Cell[] as = cells; Striped64.Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
的设计核心思路就是通过内部的分散计算来避免竞争,以空间换时间。
LongAdder
base
类似于
AtomicInteger
里面的
value
,在没有竞争的情况,cells 数组为
null
,这时只使用
base 做累加;而一旦发生竞争,cells
数组就上场了。
2
,以后每次扩容都是变为原来的两倍,一直到
cells
数组的长
CPU
的核数。为什么呢?同一时刻,能持有
CPU
时间片而去并发操作同
CPU
的核数。
cells[threadLocalRandomProbe & cells.length]
位置的 Cell
元素,该线程对
value
所做的累加操作,就执行在对应的
Cell
元素的值上,最终相当于将线程绑定到了 cells
中的某个
cell
对象上;
4、LongAdder 类的 add 方法
自增
public void increment() {
    add(1L);
}
自减
public void decrement() {
    add(-1L);
}
add方法
public void add(long x) {
    //as: 表示cells引用
    //b: base值
    //v: 表示当前线程命中的cell的期望值
    //m: 表示cells数组长度
    //a: 表示当前线程命中的cell
    Striped64.Cell[] as; long b, v; int m; Striped64.Cell a;
    /*
    stop 1:true -> 说明存在竞争,并且cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
           false -> 表示cells未初始化,当前所有线程应该将数据写到base中
    stop 2:true -> 表示发生竞争了,可能需要重试或者扩容
           false -> 表示当前线程cas替换数据成功
    */
    if (
            (as = cells) != null  //stop 1
            ||
            !casBase(b = base, b + x) //stop 2
    ) {
        /*
        进入的条件:
        1.cells数组已经初始化了,当前线程需要将数据写入到对应的cell中
        2.表示发生竞争了,可能需要重试或者扩容
        */
         /*
        是否有竞争:true -> 没有竞争
                   false -> 有竞争*/
        boolean uncontended = true;
        /*
        stop 3:as == null || (m = as.length - 1)<0 代表 cells 没有初始化
        stop 4:表示当前线程命中的cell为空,意思是还没有其他线程在同一个位置做过累加操作。
        stop 5:表示当前线程命中的cell不为空, 然后在该Cell对象上进行CAS设置其值为v+x(x为该 Cell 需要累加的值),如果CAS操作失败,表示存在争用。
        */
        if (as == null || (m = as.length - 1) < 0 || //stop 3
                (a = as[getProbe() & m]) == null || //stop 4
                !(uncontended = a.cas(v = a.value, v + x))) //stop 5
            /*
            进入的条件:
            1.cells 未初始化
            2.当前线程对应下标的cell为空
            3.当前线程对应的cell有竞争并且cas失败
            */
            longAccumulate(x, null, uncontended);
    }
}
longAccumulate方法
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    //条件成立: 说明当前线程还未分配hash值
    if ((h = getProbe()) == 0) {
        //1.给当前线程分配hash值
        ThreadLocalRandom.current(); // force initialization
        //2.提取当前线程的hash值
        h = getProbe();
        //3.因为上一步提取了重新分配的新的hash值,所以会重新分配cells数组的位置给当前线程写入,先假设它能找到一个元素不冲突的数组下标。
        wasUncontended = true;
    }
    //扩容意向,collide=true 可以扩容,collide=false 不可扩容
    boolean collide = false;                // True if last slot nonempty
    //自旋,一直到操作成功
    for (;;) {
        //as: 表示cells引用
        //a: 当前线程命中的cell
        //n: cells数组长度
        //a: 表示当前线程命中的cell的期望值
        Striped64.Cell[] as; Striped64.Cell a; int n; long v;
        //CASE1: cells数组已经初始化了,当前线程将数据写入到对应的cell中
        if ((as = cells) != null && (n = as.length) > 0) {
            //CASE1.1: true 表示下标位置的 cell 为 null,需要创建 new Cell
            if ((a = as[(n - 1) & h]) == null) {
                // cells 数组没有处于创建、扩容阶段
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Striped64.Cell r = new Striped64.Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Striped64.Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)//创建、扩容成功,退出自旋
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // CASE1.2:当前线程竞争修改cell失败
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // CASE 1.3:当前线程 rehash 过 hash 值,CAS 更新 Cell
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                    fn.applyAsLong(v, x))))
                break;
            // CASE1.4:判断是否可以扩容
            //  CASE1.4.1:n >= NCPU
            //      true -> cells数组长度已经 >= cpu核数,不可进行扩容,把扩容意向改为false
            //      false -> 可扩容
            //  CASE1.4.2:cells != as
            //      true -> 其它线程已经扩容过了,当前线程rehash之后重试即可
            //      false -> 未有线程对cells进行修改
            else if (n >= NCPU || cells != as)
                collide = false;            // 把扩容意向改为false
            // CASE 1.5:设置扩容意向为 true,但是不一定真的发生扩容
            else if (!collide)
                collide = true;
            //CASE1.6:真正扩容的逻辑
            //  CASE1.6.1:cellsBusy == 0
            //      true -> 表示cells没有被其它线程占用,当前线程可以去竞争锁
            //      false -> 表示有其它线程正在操作cells
            //  CASE1.6.2:casCellsBusy()
            //      true -> 表示当前线程获取锁成功,可以进行扩容操作
            //      false -> 表示当前线程获取锁失败,当前时刻有其它线程在做扩容相关的操作
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    //重复判断一下当前线程的临时cells数组是否与原cells数组一致(防止有其它线程提前修改了cells数组,因为cells是volatile的全局变量)
                    if (cells == as) {      // Expand table unless stale
                        //n << 1 表示数组长度翻一倍
                        Striped64.Cell[] rs = new Striped64.Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                //扩容后,将扩容意向置为false
                collide = false;
                continue;                   // Retry with expanded table
            }
            //重置当前线程hash值
            h = advanceProbe(h);
        }
        //CASE2:cells 还未初始化(as 为 null),并且 cellsBusy 加锁成功
        //  CASE2.1:判断锁是否被占用
        //         true -> 表示当前未加锁
        //         false -> 表示当前已加锁
        //  CASE2.2:因为其它线程可能会在当前线程给as赋值之后修改了cells
        //         true -> cells没有被其它线程修改
        //         false -> cells已经被其它线程修改
        // CASE2.3:获取锁
        //         true -> 获取锁成功 会把cellsBusy = 1
        //         false -> 表示其它线程正在持有这把锁
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {
                //双重检查,防止其它线程已经初始化,当前线程再次初始化,会导致数据丢失
                // Initialize table
                if (cells == as) {
                    Striped64.Cell[] rs = new Striped64.Cell[2];
                    rs[h & 1] = new Striped64.Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //CASE3:当前线程 cellsBusy 加锁失败,表示其他线程正在初始化 cells
        //所以当前线程将值累加到 base,注意 add(…)方法调用此方法时 fn 为 null
        else if (casBase(v = base, ((fn == null) ? v + x :
                fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
5、LongAdder 类的 casCellsBusy 方法
方法的代码很简单,就是将
cellsBusy
成员的值改为
1
,表示目前的
cells
数组在
 final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }
九、使用 AtomicStampedReference 解决 ABA 问题
JDK 的提供了一个类似 AtomicStampedReference 类来解决 ABA 问题。
在
CAS
的基础上增加了一个
Stamp 整型
印戳(或标记),使用这个印戳可以来觉察数据是否发生变化,给数据带上了一种实效性的检验。
的
compareAndSet
方法首先检查当前的对象引用值是否等于预期引用,
Stamp
)标志是否等于预期标志,如果全部相等,则以原子方式将引用值和印戳
Stamp
)标志的值更新为给定的更新值。
1、AtomicStampReference 的构造器:
/**  
* @param initialRef初始引用  
* @param initialStamp初始戳记  
*\ 
AtomicStampedReference(V initialRef, int initialStamp)
2、AtomicStampReference 的常用的几个方法如下:
| 方法 | 介绍 | 
| 
 
      public V getRerference()
      
 | 
引用的当前值 | 
| 
 
      public int getStamp()
      
 | 
返回当前的"戳记" | 
public boolean weakCompareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp)
 | 
 expectedReference 引用的旧值 newReference 引用的新值 expectedStamp 旧的戳记 newStamp 新的戳记  | 
3、案例
    public static void main(String[] args) {
        boolean success = false;
        AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<Integer>(1, 0);
        int stamp = atomicStampedReference.getStamp();
        success = atomicStampedReference.compareAndSet(1, 0, stamp, stamp + 1);
        System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
        //修改印戳,更新失败
        stamp = 0;
        success = atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1);
        System.out.println("success:" + success + ";reference:" + "" + atomicStampedReference.getReference() + ";stamp:" + atomicStampedReference.getStamp());
    }
输出:
success:true;reference:0;stamp:1
 success:false;reference:0;stamp:1
