CAS 是在多线程环境下操作共享资源,用于保证线程安全的技术。其是一种乐观锁的算法实现,本身不加锁。
CAS操作包含三个操作数
如果内存位置V的值与预期原值A相匹配,则将内存位置的值更新为B,并返回true;否则,不做任何操作并返回false。
比如原值为 V=10,线程1修改值为 B = 5,线程2修改值为 b = 3,CAS 操作的过程为
时间 | 线程1 | 线程2 |
---|---|---|
1 | 读取内存值为预期值A = 10 | |
2 | 判断当前的内存值与预期值是否相等 A == B | 读取内存值为预期值A = 10 |
3 | 相等,更新内存值为B,则V = 5,返回 true | 判断当前的内存值与预期值是否相等 A == B |
4 | 不相等,不操作,返回 false |
CAS 的判断和更新操作是原子的,意味着在线程1判断和更新之后线程2才能判断和更新。
常用方法有:
- get():获取原子类当前的值
- getAndSet(int newValue):设置当前值为newValue,并返回原值(CAS实现)
- getAndIncrement():设置当前值为当前值加1,并返回原值(CAS实现)
- getAndDecrement():设置当前值为当前值减1,并返回原值(CAS实现)
- getAndAdd(int delta):设置当前值为当前值加 delta,并返回原值(CAS实现)
- getAndUpdate(IntUnaryOperator updateFunction):设置当前值为当前值加 updateFunction.applyAsInt 方法返回值,并返回原值,updateFunction.applyAsInt 方法的参数为原值(CAS实现)
- getAndAccumulate(int x, IntBinaryOperator accumulatorFunction):有两个参数,第一个参数为原值,第二个参数为x。getAndAccumulate 表示修改当前值为 accumulatorFunction 执行之后的值(CAS实现)
- incrementAndGet():先加1再返回,返回的是修改后的值
- decrementAndGet():先减1再返回
- addAndGet(int delta):先加 delta 再返回
- updateAndGet(IntUnaryOperator updateFunction)
- accumulateAndGet(int x, IntBinaryOperator accumulatorFunction)
compareAndSet(int expect, int update) :expect 表示修改时的原值,update 表示要修改为的值,如果修改成功 方法返回 true,如果修改失败,方法返回 false。其底层使用了 Unsafe 类的 compareAndSwapInt 方法实现,可以说,以上的所有方法的实现都依赖于 unsafe.compareAndSwapInt 方法(具体可查看源码,源码其实非常简单)。
public final boolean compareAndSet(int expect, int update) {
// this:当前对象,valueOffset:value属性的内存偏移量
// (this和valueOffset应该能知道当前内存中value的值是多少就是,内存位置V)
// expect 当前修改前读取到的值,这就是预期原值 A
// 当前修改需要修改为的值,这就是新值 B
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
Unsafe 对象大多数方法都是 native 方法,Unsafe 对象的 compareAndSwapXXX 方法对应操作系统底层的 cmpxchg 指令。所以,我们的原子操作,依赖于 unsafe 对象对应的底层操作系统指令来保证原子性。
我们再来看看 getAndUpdate(UnaryOperator<V> updateFunction)方法的源码:
public final V getAndUpdate(UnaryOperator<V> updateFunction) {
V prev, next;
do {
prev = get();// 获取原值(在这之后,此原值可能被其他线程修改)
next = updateFunction.apply(prev);// 计算出要修改为的值
} while (!compareAndSet(prev, next));
// compareAndSet 判断,如果此线程获取的原值已经被其他线程修改,compareAndSet 返回 false ,继续循环直到操作成功
// 此 while 循环其实就是自旋
return prev;
}
// 以上整个方法其实就是 CAS 的代码实现
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class NumberTest {
public static void main(String[] args) throws InterruptedException {
NumberIntContainer c1 = new NumberIntContainer();
NumberCASContainer c2 = new NumberCASContainer();
for(int i = 0; i < 10000;i++){
new Thread(()->{
c1.add();
c2.add();
},"t" + i).start();
}
TimeUnit.SECONDS.sleep(5);
System.out.println("c1:" + c1.getNum());
System.out.println("c2:" + c2.getNum().get());
}
}
@Getter
class NumberIntContainer{
// 此处添加 volatile 是不能解决问题的
private int num = 0;
// 如果添加 synchronized 关键字,可以解决问题,但 synchronized 是重量级锁
public void add(){
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
num++;
}
}
@Getter
class NumberCASContainer{
private AtomicInteger num = new AtomicInteger(0);
public void add(){
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// getAndIncrement 与 i++ 类似,但其是原子操作,i++不是
num.getAndIncrement();
}
}
结果:
c1:9962
c2:10000
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class ABATest {
public static void main(String[] args) throws InterruptedException {
// 公司账户 10w 元
AtomicReference<Integer> num = new AtomicReference<>(10);
new Thread(()->{
int pre = num.get();
try {
// 等待,表示等
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean rs = num.compareAndSet(pre,pre - 5);
log.debug("正常出账:{} ",rs);
},"正常出账").start();
// 保障正常出账线程,先启动
TimeUnit.NANOSECONDS.sleep(20);
new Thread(()->{
int pre = num.get();
boolean rs = num.compareAndSet(pre,pre - 6);
log.debug("挪用 6 万元,{}",rs);
pre = num.get();
rs = num.compareAndSet(pre, pre + 6);
log.debug("填补 6 万元:{}",rs);
},"有问题的财务").start();
}
}
示例中,“有问题的财务”线程挪用了公司资金,又在“正常出账”线程之前填补了资金,导致“正常出账”线程也成功执行,而且无察觉且正常执行修改成功,这就是ABA问题。
22:11:57.085 [有问题的财务] DEBUG com.yyoo.thread.atomic.ABATest - 挪用 6 万元,true
22:11:57.089 [有问题的财务] DEBUG com.yyoo.thread.atomic.ABATest - 填补 6 万元:true
22:11:58.081 [正常出账] DEBUG com.yyoo.thread.atomic.ABATest - 正常出账:true
ABA 问题在大多数情况下没有问题,但如我们示例这种在实际应用中其实是有问题的,我们可以通过 AtomicStampedReference 来解决此问题。AtomicStampedReference 引入了 stamp (版本号)来区别每次的修改,即便修改后的值与之前的某个状态的值一致,其版本号也是不同的。AtomicMarkableReference 跟 AtomicStampedReference 的不同点在于,AtomicMarkableReference 的版本号是一个 boolean 值。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;
@Slf4j
public class ABATest1 {
public static void main(String[] args) throws InterruptedException {
// 公司账户 10w 元,起始版本为 0
AtomicStampedReference<Integer> num = new AtomicStampedReference <>(10,0);
new Thread(()->{
int pre = num.getReference(); // 获取当前的值
int stamp = num.getStamp();// 获取当前的版本
try {
// 等待,表示等
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果原值和原版本号与当前的版本号不对应则修改失败
boolean rs = num.compareAndSet(pre,pre - 5,stamp,stamp + 1);
log.debug("正常出账:{} ",rs);
},"正常出账").start();
// 保障正常出账线程,先启动
TimeUnit.NANOSECONDS.sleep(20);
new Thread(()->{
int pre = num.getReference();
int stamp = num.getStamp();
boolean rs = num.compareAndSet(pre,pre - 6,stamp,stamp + 1);
log.debug("挪用 6 万元,{}",rs);
pre = num.getReference();
stamp = num.getStamp();
rs = num.compareAndSet(pre, pre + 6,stamp,stamp + 1);
log.debug("填补 6 万元:{}",rs);
},"有问题的财务").start();
}
}
执行结果
22:13:16.470 [有问题的财务] DEBUG com.yyoo.thread.atomic.ABATest1 - 挪用 6 万元,true
22:13:16.473 [有问题的财务] DEBUG com.yyoo.thread.atomic.ABATest1 - 填补 6 万元:true
22:13:17.472 [正常出账] DEBUG com.yyoo.thread.atomic.ABATest1 - 正常出账:false
每次累加(累减),从 0 开始
AtomicInt、AtomicLong 也能进行自增、自减,但 LongAdder 的效率更高,因为 LongAdder、DoubleAdder 都继承自 Striped64,Striped64 中包含三个重要的字段
/**
* 并发竞争情况下的单元数组
* 并发情况下可能线程1 累加 cells[0] 而线程2 累加 cells[1] 最后将这些累加的结果结合起来
* 每次扩容为 2 的次幂
*/
transient volatile Cell[] cells;
/**
* 基本值,在没有竞争的情况下使用 CAS 来累加
*/
transient volatile long base;
/**
* Spinlock (CAS 锁) 当 cells 创建或更改大小时使用
*/
transient volatile int cellsBusy;
原子累计计算
LongAccumulator 实例化时接收两个参数,LongBinaryOperator accumulatorFunction 和 long identity,accumulatorFunction 为计算函数(里面来定义计算公式),函数第一个值为当前累计的值,第二个值为调用 accumulate(long x) 方法传入的值,identity 是起始值
示例
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
public class LongAdderTest {
public static void main(String[] args) throws InterruptedException {
LongAdder a = new LongAdder();
// 计算公式为 rs = x + 2*y;
LongAccumulator b = new LongAccumulator((x,y)-> x + 2 * y,0);
for(int i = 0; i < 1000; i++){
// 此处只是示例,工作场景请勿这样创建线程
new Thread(()->{
a.add(10);
},"a" + i).start();
new Thread(()->{
// 相当于每次加20
b.accumulate(10);
},"b" + i).start();
}
TimeUnit.SECONDS.sleep(2);
log.debug("a 累计:{}",a.longValue());
log.debug("b 累计:{}",b.longValue());
}
}
结果:
11:32:33.206 [main] DEBUG com.yyoo.thread.atomic.LongAdderTest - a 累计:10000
11:32:33.212 [main] DEBUG com.yyoo.thread.atomic.LongAdderTest - b 累计:20000
优点:
缺点:
所以,在线程数不多的情况下使用 CAS 是比较高效的。