CAS全称Compare-And-Swap,是一种无锁编程算法,即比较当前的值与旧值是否相等若相等则进行修改操作(乐观锁机制)
,该类常用于多线程共享变量的修改操作。而其底层实现也是基于硬件平台的汇编指令,JVM
只是封装其调用仅此而已。
如下所示,可以看出使用封装CAS
操作的AtomicInteger
操作多线程共享变量无需我们手动加锁,因为避免过多人为操作这就大大减少了多线程操作下的失误。
使用原子类操作共享数据
public class CasTest {
private AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
// 使用 AtomicInteger 后,不需要加锁,也可以实现线程安全
public int getCount() {
return count.get();
}
public static void main(String[] args) {
}
}
使用sync锁操作数据
public class Test {
private int i=0;
public synchronized int add(){
return i++;
}
}
CAS
工作原理是基于乐观锁
且操作是原子性的,与synchronized
的悲观锁(底层需要调用操作系统的mutex锁)
相比,效率也会相对高一些。
不是,CAS
是主要是通过处理器的指令来保证原子性的。
但即便如此CAS
仍然存在两个问题:
CAS
:如下代码所示,这就是AtomicInteger
底层的UNSAFE
类如何进行CAS
的具体代码 ,可以看出这个CAS
操作需要拿到volatile
变量后在进行循环CAS
才有可能成功这就很可能存在自旋循环,从而给CPU
带来很大的执行开销。 public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;//声明一个var5,只
do {
//通过getIntVolatile获取当前原子数的值
var5 = this.getIntVolatile(var1, var2);
//只有传入var1和现在获得的var5 一样,才说明值没被修改过,我们才能将值设置为var4
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
CAS
只能对一个变量进行原子操作:为了解决这个问题,JDK 1.5
之后通过AtomicReference
使得变量可以封装成一个对象进行操作
ABA
问题:总所周知CAS就是比对当前值与旧值是否相等,在进行修改操作,假设我现在有一个变量值为A,我改为B,再还原为A,这样操作变量值是没变的?那么CAS也会成功不就不合理吗?这就好比一个银行储户想查询概念转账记录,如果转账一次记为1,如果按照ABA问题的逻辑,那么这个银行账户转账记录次数有可能会缺少。为了解决这个问题JDK 1.5提供了AtomicStampedReference
,通过比对版本号在进行CAS
操作,那么上述操作就会变为1A->2B->3A
,由于版本追加,那么我们就能捕捉到当前变量的变化了。
代码也很简单,就是拿到具有可见性的volatile
变量i,然后判断i和当前对象paramObject
对应的i值是否一致,若一致则说明没被人该过,进而进行修改操作,反之自旋循环获取在进行CAS。
public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
i = getIntVolatile(paramObject, paramLong);
while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
return i;
}
public final long getAndAddLong(Object paramObject, long paramLong1, long paramLong2)
{
long l;
do
l = getLongVolatile(paramObject, paramLong1);
while (!compareAndSwapLong(paramObject, paramLong1, l, l + paramLong2));
return l;
}
public final int getAndSetInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
i = getIntVolatile(paramObject, paramLong);
while (!compareAndSwapInt(paramObject, paramLong, i, paramInt));
return i;
}
public final long getAndSetLong(Object paramObject, long paramLong1, long paramLong2)
{
long l;
do
l = getLongVolatile(paramObject, paramLong1);
while (!compareAndSwapLong(paramObject, paramLong1, l, paramLong2));
return l;
}
public final Object getAndSetObject(Object paramObject1, long paramLong, Object paramObject2)
{
Object localObject;
do
localObject = getObjectVolatile(paramObject1, paramLong);
while (!compareAndSwapObject(paramObject1, paramLong, localObject, paramObject2));
return localObject;
}
代码逻辑和注释如下,读者可自行debug查看逻辑
public class CasCountInc {
private static Logger logger = LoggerFactory.getLogger(CasCountInc.class);
// 获取Unsafe对象
private static Unsafe unsafe = getUnsafe();
// 线程池数目
private static final int THREAD_COUNT = 20;
// 每个线程运行自增次数
private static final int EVERY_THREAD_ADD_COUNT = 500;
// 自增的count的值,volatile保证可见性
private volatile int count = 0;
// count字段的偏移量
private static long countOffSet;
private static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
} catch (Exception e) {
logger.info("获取unsafe失败,失败原因:[{}]", e.getMessage(), e);
}
return unsafe;
}
static {
try {
countOffSet = unsafe.objectFieldOffset(CasCountInc.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
logger.error("获取count的偏移量报错,错误原因:[{}]", e.getMessage(), e);
}
}
public void inc() {
int oldCount = 0;
//基于cas完成自增
do {
oldCount = count;
} while (!unsafe.compareAndSwapInt(this, countOffSet, oldCount, oldCount + 1));
}
public static void main(String[] args) throws InterruptedException {
CasCountInc casCountInc = new CasCountInc();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
IntStream.range(0, THREAD_COUNT).forEach(i -> {
new Thread(() -> {
IntStream.range(0, EVERY_THREAD_ADD_COUNT).forEach((j) -> {
casCountInc.inc();
});
countDownLatch.countDown();
}).start();
});
countDownLatch.await();
logger.info("count最终结果为 [{}]", casCountInc.count);
}
}
CAS
平时怎么用的:我们希望从一个视频中完成人像比对的功能,通过图像识别技术完成视频逐帧切割,只要有一帧的图片被识别分数达到90以上即可算完成。任务是串行的,如果是个大视频则执行时间会非常漫长。
经过对需求复盘,整体来说这个功能就是多任务只要有一个任务完成就算完成的需求。对此我们写了一个Callable
的任务,这个任务中的多线程共享两个变量atomicInteger
、countDownLatch
,都是由外部调度的,只要一个任务分数达到90则CAS
自增,countDown
倒计时门闩。
public class Task implements Callable<Integer> {
private static Logger logger = LoggerFactory.getLogger(Task.class);
private AtomicInteger atomicInteger ;
private CountDownLatch countDownLatch;
public Task(AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
this.atomicInteger = atomicInteger;
this.countDownLatch = countDownLatch;
}
@Override
public Integer call() throws Exception {
int score = (int) (Math.random() * 100);
logger.info("当前线程:{},识别分数:{}", Thread.currentThread().getName(), score);
synchronized (this){
}
if (score > 90 && atomicInteger.getAndIncrement()==0) {
logger.info("当前线程:{} countDown",Thread.currentThread().getName());
countDownLatch.countDown();
logger.info("当前线程:{} 返回比对分数:{}", Thread.currentThread().getName(), score);
return score;
}
return -1;
}
}
执行代码
public class Main {
private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(100);
CountDownLatch countDownLatch=new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
Future<Integer> task = threadPool.submit(new Task(countDownLatch));
}
logger.info("阻塞中");
countDownLatch.await();
logger.info("阻塞结束");
threadPool.shutdown();
while (!threadPool.isTerminated()){
}
}
}
存在问题:CAS是基于乐观锁机制,所以数据同步失败就会原地自旋,在高并发场景下开销很大,所以线程数很大的情况下不建议使用原子类。
存在问题: 如果并发量大的话,自旋的线程多了就会导致性能瓶颈。
for 循环代替 CAS执行效率是否一样:大概率是CAS快,原因如下:
native
方法更接近底层sync
锁或者Lock
无论那种都需要上锁和释放的逻辑,相比CAS乐观锁来说开销很大。AtomicBoolean: 原子更新布尔类型。
AtomicInteger: 原子更新整型。
AtomicLong: 原子更新长整型。
AtomicIntegerArray: 原子更新整型数组里的元素。
AtomicLongArray: 原子更新长整型数组里的元素。
AtomicReferenceArray: 原子更新引用类型数组里的元素。
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayDemo {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(new int[] { 0, 0 });
System.out.println(array);
// 索引1位置+2
System.out.println(array.getAndAdd(1, 2));
System.out.println(array);
}
}
AtomicReference: 原子更新引用类型。
AtomicStampedReference: 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。
AtomicMarkableReferce: 原子更新带有标记位的引用类型。
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public static void main(String[] args){
// 创建两个Person对象,它们的id分别是101和102。
Person p1 = new Person(101);
Person p2 = new Person(102);
// 新建AtomicReference对象,初始化它的值为p1对象
AtomicReference ar = new AtomicReference(p1);
// 通过CAS设置ar。如果ar的值为p1的话,则将其设置为p2。
ar.compareAndSet(p1, p2);
Person p3 = (Person)ar.get();
System.out.println("p3 is "+p3);
System.out.println("p3.equals(p1)="+p3.equals(p1));
System.out.println("p3.equals(p2)="+p3.equals(p2));
}
}
class Person {
volatile long id;
public Person(long id) {
this.id = id;
}
public String toString() {
return "id:"+id;
}
}
AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
AtomicLongFieldUpdater: 原子更新长整型字段的更新器。
AtomicStampedFieldUpdater: 原子更新带有版本号的引用类型。
AtomicReferenceFieldUpdater: 上面已经说过此处不在赘述。
如下所示,我们创建一个基础类DataDemo
,通过原子类CAS
操作字段值进行自增操作。
public class TestAtomicIntegerFieldUpdater {
private static Logger logger = LoggerFactory.getLogger(TestAtomicIntegerFieldUpdater.class);
public static void main(String[] args) {
TestAtomicIntegerFieldUpdater tIA = new TestAtomicIntegerFieldUpdater();
tIA.doIt();
}
/**
* 返回需要更新的整型字段更新器
*
* @param fieldName
* @return
*/
public AtomicIntegerFieldUpdater<DataDemo> updater(String fieldName) {
return AtomicIntegerFieldUpdater.newUpdater(DataDemo.class, fieldName);
}
public void doIt() {
DataDemo data = new DataDemo();
// 修改公共变量,返回更新前的旧值 0
AtomicIntegerFieldUpdater<DataDemo> updater = updater("publicVar");
int oldVal = updater.getAndIncrement(data);
logger.info("publicVar 更新前的值[{}] 更新后的值 [{}]", oldVal, data.publicVar);
// 更新保护级别的变量
AtomicIntegerFieldUpdater<DataDemo> protectedVarUpdater = updater("protectedVar");
int oldProtectedVar = protectedVarUpdater.getAndAdd(data, 2);
logger.info("protectedVar 更新前的值[{}] 更新后的值 [{}]", oldProtectedVar, data.protectedVar);
// logger.info("privateVar = "+updater("privateVar").getAndAdd(data,2)); 私有变量会报错
/*
* 下面报异常:must be integer
* */
// logger.info("integerVar = "+updater("integerVar").getAndIncrement(data));
//logger.info("longVar = "+updater("longVar").getAndIncrement(data));
}
class DataDemo {
// 公共且可见的publicVar
public volatile int publicVar = 0;
// 保护级别的protectedVar
protected volatile int protectedVar = 4;
// 私有变量
private volatile int privateVar = 5;
// final 不可变量
public final int finalVar = 11;
public volatile Integer integerVar = 19;
public volatile Long longVar = 18L;
}
}
1. 变量必须使用volatile保证可见性
2. 必须是当前对象可以访问到的类型才可进行操作‘
3. 只能是实例变量而不是类变量,即不可以有static修饰符
4. 包装类也不行
CAS更新前会检查值有没有变化,如果没有变化则认为没人修改过,在进行更新操作。这种情况下,若我们A值修改为B,B再还原为A。这种修改再还原的操作,CAS是无法感知是否变化的,这就是所谓的ABA问题。
源码如下所示,可以看到AtomicStampedReference
解决ABA问题的方式是基于当前修改操作的时间戳和元引用值是否一致,若一直则进行数据更新
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference; //维护对象引用
final int stamp; //用于标志版本
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
....
/**
* expectedReference :更新之前的原始引用值
* newReference : 新值
* expectedStamp : 预期时间戳
* newStamp : 更新后的时间戳
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取当前的(元素值,版本号)对
Pair<V> current = pair;
return
// 引用没变
expectedReference == current.reference &&
// 版本号没变
expectedStamp == current.stamp &&
//可以看到这个括号里面用了一个短路运算如果当前版本与新值一样就说更新过,就不往下走CAS代码了
((newReference == current.reference &&
newStamp == current.stamp) ||
// 构造新的Pair对象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
// 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
代码示例,我们下面就用other代码模拟干扰现场,如果other现场先进行CAS更新再还原操作,那么main线程的版本号就会过时,CAS就会操作失败
/**
* ABA问题代码示例
*/
public class AtomicStampedReferenceTest {
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread() + ",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //获取当前标识别
try {
Thread.sleep(1000); //等待1秒 ,以便让干扰线程执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1, 2, stamp, stamp + 1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() + ",CAS操作结果: " + isCASSuccess);
}, "主操作线程");
Thread other = new Thread(() -> {
Thread.yield(); // 确保thread-main 优先执行
atomicStampedRef.compareAndSet(1, 2, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【increment】 ,值 = " + atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2, 1, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
System.out.println("操作线程" + Thread.currentThread() + ",【decrement】 ,值 = " + atomicStampedRef.getReference());
}, "干扰线程");
main.start();
other.start();
}
}
AtomicMarkableReference
,它不是维护一个版本号,而是维护一个boolean类型的标记,标记对象是否有修改,从而解决ABA问题。
public boolean weakCompareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
return compareAndSet(expectedReference, newReference,
expectedMark, newMark);
}
AtomicInteger atomicInteger=new AtomicInteger(10000);
atomicInteger.compareAndSet(10000, 0);