JUC原子操作类

发布时间:2024年01月03日

原子操作类

  • 基本类型原子类:AtomicInteger、AtomicBoolean、AtomicLong,常见API:

    • get 获取当前值
    • getAndSet 获取当前的值,并设置新的值
    • getAndIncrement 获取当前的值,并自增
    • getAndDecrement 获取当前的值,并自减
    • getAndAdd 获取当前的值,并加上预期的值
    • compareAndSet 如果输入的值等于预期值,则以原子方式将值更新
  • 数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,用法和基本类型原子类类似

     AtomicIntegerArray atomicIntegerArray =new AtomicIntegerArray(new int[]{1,2,3,4});
    
  • 引用类型原子类

    • AtomicReference :原子引用
    • AtomicStampedReference :携带版本号的引用类型原子类,可以解决ABA问题,记录了引用被修改过多少次
    • AtomicMarkableReference:带有标记位的引用类型原子类,通过一个布尔值的状态戳,用来判断对象是否被修改过,解决一次性修改问题
  • 对象的属性修改原子类

  • 原子操作增强类

CountDownLatch使用场景

  • 线程计数器 用于线程执行任务,计数 等待线程结束

  • 例如:

        public static void test() throws Exception{
            AtomicInteger ato = new AtomicInteger(0);
            Integer size = 100;
            //100个线程用于计算
            CountDownLatch countDownLatch = new CountDownLatch(100);
            for (int a = 0; a < size; a++) {
                new Thread(()->{
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    for (int b =0;b <100;b++){
                        //自增
                        ato.getAndIncrement();
                    }
                    //计算完了就减1
                    countDownLatch.countDown();
                }).start();
            }
            //只有 countDownLatch 中的数量为0 才能继续执行,避免 Thread.sleep()的操作,计算完成能够即时执行
            countDownLatch.await();
            System.out.println("最终计算结果:"+ato.get());
        }
    
    

对象的属性修改原子类

对象的属性修改原子类,可以用线程安全的方式操作非线程安全对象内的某些字段,例如:某个对象有多个属性,但是只有少量字段需要频繁更新,加锁虽然可以保证线程安全,但是有可能会锁住整个对象,所以可以用原子更新代替加锁

  • AtomicIntegerFieldUpdater,基于反射,原子更新对象中,被volatile修饰的 int类型字段的值

  • AtomicLongFieldUpdater基于反射,原子更新对象中,被volatile修饰的 long类型字段的值

  • AtomicReferenceFieldUpdater,基于反射,原子更新引用类型,被volatile修饰的字段的值

  • 使用原子更新的要:

    • 更新的对象属性必须是 public volatile 修饰的
    • 而且因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,通过更新器去更新属性
  • 示例:如下出了金额以外的字段是不会经常更新的,针对金额字段加锁,肯定是可以保证线程安全的,但是很重量级,不加锁的话,结果会不确定,线程不安全

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class AtomicEntity {
        //账号
        private String account;
        //联系方式
        private String phone;
        //姓名
        private String name;
        //金额
        private volatile Integer money;
    
        public synchronized void addMoney(Integer add) {
            money = money + add;
        }
    
        public synchronized void subMoney(Integer sub) {
            money = money - sub;
        }
    }
    
    
        private static void test3() throws Exception {
            AtomicEntity atomic = new AtomicEntity("123456789", "123456", "张三", 0);
    
            CountDownLatch countDownLatch = new CountDownLatch(200);
    
            for (int a = 0; a < 100; a++) {
                new Thread(() -> {
                    for (int b = 0; b < 100; b++) {
                        atomic.addMoney(1);
                    }
                    countDownLatch.countDown();
                }, "线程" + a).start();
            }
    
            for (int a = 0; a < 100; a++) {
                new Thread(() -> {
                    for (int b = 0; b < 100; b++) {
                        atomic.subMoney(1);
                    }
                    countDownLatch.countDown();
                }, "线程" + a).start();
            }
            countDownLatch.await();
             //这里执行结果肯定是正确的,如果不加锁会导致线程错乱
            System.out.println("最终结果" + atomic.getMoney());
        }
    
    

    如果不想加锁,使用对象的属性修改原子类,AtomicIntegerFieldUpdater示例

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class AtomicEntity {
        //账号
        private String account;
        //联系方式
        private String phone;
        //姓名
        private String name;
        //金额 ,修饰符必须是  public volatile 
        public volatile int money;
    
        public AtomicEntity(String account, String phone, String name, Integer money) {
            this.account = account;
            this.phone = phone;
            this.name = name;
            this.money = money;
        }
    
        AtomicIntegerFieldUpdater<AtomicEntity> ato =
                AtomicIntegerFieldUpdater.newUpdater(AtomicEntity.class, "money");
    
        public void atoAddMoney(AtomicEntity atomic, Integer add) {
            ato.getAndAdd(atomic, add);
        }
    
        public void atoSubMoney(AtomicEntity atomic, Integer sub) {
            ato.getAndAdd(atomic, -sub);
        }
    }
    
    
        private static void test4() throws Exception {
            AtomicEntity atomic = new AtomicEntity("123456789", "123456", "李四", 0);
    
            CountDownLatch countDownLatch = new CountDownLatch(200);
    
            for (int a = 0; a < 100; a++) {
                new Thread(() -> {
                    for (int b = 0; b < 100; b++) {
                        atomic.atoAddMoney(atomic,1);
                    }
                    countDownLatch.countDown();
                }, "线程" + a).start();
            }
    
            for (int a = 0; a < 100; a++) {
                new Thread(() -> {
                    for (int b = 0; b < 100; b++) {
                        atomic.atoSubMoney(atomic,1);
                    }
                    countDownLatch.countDown();
                }, "线程" + a).start();
            }
            countDownLatch.await();
            //不加锁也可以保证线程安全
            System.out.println("最终结果" + atomic.getMoney());
    
        }
    
    

原子操作增强类

  • 从 jdk 8 才引入了,原子操作增强类,如果是i++操作,推荐使用 LongAdder ,能比 AtomicLong 有更好的性能,因为LongAdder 可以减少乐观锁的重试次数
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder
  • 上面四个增强类继承于 Striped64 ,Striped64 继承了 Number,加上上面的16个类,统称为18罗汉

LongAdder

  • 当多个线程更新用于收集统计信息,但不用于细粒度同步控制的目的公共和时,通常优于AtomicLong
  • 在低并发下LongAdder和AtomicLong差距不大,高并发下明显优于AtomicLong,代价是空间消耗也大
  • 常见api
    • add (long x) 将当前值加x
    • increment() 将当前值加1
    • decrement() 将当前值减1
    • sum() 返回当前值,如果是并发更新,sum返回的值不精确
    • reset() 将value重置为0,这个方法只能在没有并发更新时使用
    • sumThenReset() 获取当前值,并将value重置为0
  • 只能计算加法,且必须从0开始计算

LongAccumulator

  • 能够自定义计算规则

    @FunctionalInterface
    public interface LongBinaryOperator {
    	//left是初始值
        long applyAsLong(long left, long);
    }
    
        private static void test1() {
            //实现函数式接口 LongBinaryOperator ,自定义计算规则,第二个参数是初始值
            LongAccumulator longAccumulator = new LongAccumulator((x, y) -> {
                return x * y;
            }, 1);
            //这里的参数 y:5,和原本的值1,通过自定义的计算规则,得到两者的计算结果
            longAccumulator.accumulate(5);
            longAccumulator.accumulate(5);
            //结果为25
            System.out.println(longAccumulator.longValue());
    
        }
    

计数器案例

原子操作和同步方法的性能比较

/**
 * 计数器
 */
@NoArgsConstructor
@AllArgsConstructor
@Data
public class LikeCounter {

    public volatile long num;

    public LikeCounter(long num) {
        this.num = num;
    }

    /**
     * 第一种同步方法
     */
    public synchronized void add() {
        num++;
    }

    /**
     * atomicLong原子操作
     */
    AtomicLong atomicLong = new AtomicLong(num);

    public void addAto() {
        atomicLong.getAndIncrement();
    }

    public long getAto() {
        return atomicLong.get();
    }

    /**
     * 对象的属性修改原子类 atomicLongFieldUpdater
     */
    AtomicLongFieldUpdater<LikeCounter> atomicLongFieldUpdater = AtomicLongFieldUpdater.
            newUpdater(LikeCounter.class, "num");

    public void addAtoField(LikeCounter likeCounter) {
        atomicLongFieldUpdater.getAndIncrement(likeCounter);
    }

    /**
     * 原子扩展类 longAdder
     */
    LongAdder longAdder = new LongAdder();

    public void addLongAdder() {
        longAdder.increment();
    }

    public long getLongAdder() {
        return longAdder.longValue();
    }

    /**
     * 原子扩展类 LongAccumulator
     */

    LongAccumulator  longAccumulator =new LongAccumulator((x,y)->{
        return x+y;
    },num);

    public void addlongAccumulatorr() {
        longAccumulator.accumulate(1);
    }

    public long getlongAccumulator() {
        return longAccumulator.get();
    }

}

    public static void main(String[] args) {

        try {
            test1();
            test2(); 
            test3();
            test4();
            test5();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }


    }

    private static void test1() throws Exception {
        long l1 = System.currentTimeMillis();
        LikeCounter like1 = new LikeCounter(0);
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int a = 0; a < 10000; a++) {
            new Thread(() -> {
                for (int b = 0; b < 10000; b++) {
                    like1.add();
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("调用synchronized方法,执行结果为:"+like1.getNum()+
                ",耗时:"+(System.currentTimeMillis()-l1));
    }

    private static void test2() throws Exception {
        long l1 = System.currentTimeMillis();
        LikeCounter like1 = new LikeCounter(0);
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int a = 0; a < 10000; a++) {
            new Thread(() -> {
                for (int b = 0; b < 10000; b++) {
                    like1.addAto();
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("调用atomicLong方法,执行结果为:"+like1.getAto()+
                ",耗时:"+(System.currentTimeMillis()-l1));
    }

    private static void test3() throws Exception {
        long l1 = System.currentTimeMillis();
        LikeCounter like1 = new LikeCounter(0);
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int a = 0; a < 10000; a++) {
            new Thread(() -> {
                for (int b = 0; b < 10000; b++) {
                    like1.addAtoField(like1);
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("调用atomicLongFieldUpdater方法,执行结果为:"+like1.getNum()+
                ",耗时:"+(System.currentTimeMillis()-l1));
    }

    private static void test4() throws Exception {
        long l1 = System.currentTimeMillis();
        LikeCounter like1 = new LikeCounter(0);
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int a = 0; a < 10000; a++) {
            new Thread(() -> {
                for (int b = 0; b < 10000; b++) {
                    like1.addLongAdder();
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("调用longAdder方法,执行结果为:"+like1.getLongAdder()+
                ",耗时:"+(System.currentTimeMillis()-l1));
    }

    private static void test5() throws Exception {
        long l1 = System.currentTimeMillis();
        LikeCounter like1 = new LikeCounter(0);
        CountDownLatch countDownLatch = new CountDownLatch(10000);
        for (int a = 0; a < 10000; a++) {
            new Thread(() -> {
                for (int b = 0; b < 10000; b++) {
                    like1.addlongAccumulatorr();
                }
                countDownLatch.countDown();
            }).start();
        }
        countDownLatch.await();
        System.out.println("调用longAccumulator方法,执行结果为:"+like1.getlongAccumulator()+
                ",耗时:"+(System.currentTimeMillis()-l1));
    }

调用结果如下:

  • synchronized方法最慢,
  • longAdder 和 longAccumulato 两者最快,性能基本相当
调用synchronized方法,执行结果为:100000000,耗时:4621
调用atomicLong方法,执行结果为:100000000,耗时:1522
调用atomicLongFieldUpdater方法,执行结果为:100000000,耗时:2611
调用longAdder方法,执行结果为:100000000,耗时:368
调用longAccumulator方法,执行结果为:100000000,耗时:333

longAdder

Striped64 内部结构

  • 内部变量 base :并发低的时候,直接累加到base上

  • 另外还有一个内部类cell ,和 cell[],并发高的时候,会把线程分散到自己的槽 cell[i] 中

longAdder 为什么快

  • AtomicLong的底层是cas,即使并发很大,一次也只能有一个线程完成cas操作,剩下的线程只能自旋等待,就会导致大量的cpu空转
  • longAdder的同样是cas
    • 但是采用了分散热点的思想,将value值分散到一个cell 数组中,不同的线程会命中数组中不同的槽,每个线程对自己槽中的值进行cas操作,这样就分摊了压力,减少了线程冲突的概率,也就减少了线程自旋等待的时间,
    • 如果要获取真正的value值时,sum方法会将所有的cell数组中的value值和base累加作为返回值
  • 所以如果并发不大,longAdder和AtomicLong差别不大,都是对一个base进行操作
  • 但是高并发下longAdder会采用空间换时间的做法,使用一个cell数组拆分value值,多个线程需要同时对value值进行操作的时候,先通过线程 id 的 hash 值映射到数组对应位置,再对该位置的值进行操作,当所有线程都执行完毕,base的值加上cell 数组中的值就是最终结果

longAdder 和 AtomicLong对比

  • AtomicLong是多个线程对单个热点值进行原子更新,是线程安全的,会损失一些性能,再高精度要求时,可以使用
  • longAdder 再高并发下,有较好的性能,对值精确度要求不高时,可以使用,每个线程都有自己的槽,各个线程一般只对自己槽中的值进行cas操作
add 方法
  • 方法入参是要增加的值

  • 刚开始 cell[] 等于null,尝试用cas操作更新base值,cas执行成功,把base值改为了期望值,本次add就结束了

  • 随着并发的升高,cas操作失败,就需要执行 longAccumulate方法,去初始化cell数组分散压力

  • 一旦 cell[] 数组初始化完成,就需要判断当前线程所在的 cell 是否为null

    • 为 null,执行 Striped64 的 longAccumulate方法,来初始化对应位置的cell

    • 不为null,就执行对应 cell 的 cas操作,

      • 执行成功就没有冲突,结束本次add操作
      • 执行失败,表示本次操作有冲突,需要执行 Striped64 的 longAccumulate方法来扩容 cell []
    public void add(long x) {
        //b获取的base值,v表示期望值 ,m为数组长度,a表示当前线程命中的数组单元格
        Cell[] as; long b, v; int m; Cell a;
        //刚开始cells等于null,执行casBase,结果取反
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            //这个boolean值代表cell数组有没有冲突
            boolean uncontended = true;
            //判断数组有没有初始化完成
            if (as == null || (m = as.length - 1) < 0 ||
                //判断线程有没有冲突,等于null说明没有冲突,getProbe()计算hash值
                (a = as[getProbe() & m]) == null ||
                //如果线程映射的对应位置不为null,就执行对应 cell 的 cas操作,执行成功返回true,取反得到false表没有冲突,结束本次add操作
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
longAccumulate
  • 方法属于 Striped64,方法入参:

    • x 是需要增加的值,
    • longAdder 中 fn 默认是null, LongAccumulator会传递计算规则进来
    • wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
  • getProbe()方法,获取线程的hash值,如果返回0,会重新计算一个hash值,重新计算后,认为线程本次操作没有竞争关系,把竞争标志改为 true ,也就是不存在冲突

  • advanceProbe(h),重置当前线程的hash值,让线程再次参与竞争

  • longAccumulate 方法分为三大部分

    • 数组没有初始化

      • 如果数组没有初始化,就需要加锁去初始cell数组
        • 这里没有使用synchronized加锁,而是使用内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁
        • 如果是无锁状态,就会使用cas操作把该值更新为1,更新成功代表加锁成功,去完成数组的初始化,cell数组,默认长度为2,同时初始化当前线程hash对应数组位置的cell对象,数组初始化完成后释放锁
    • 数组正在初始化,也计是其他未拿到锁的线程,作为兜底方案,会再这个分支把值直接累加到base上

    • 数组初始化完成

      • 如果数组初始化完成,但是线程对应位置的cell对象为null,就需要初始化cell对象

        • 初始化cell对象,同样依靠内部变量cellsBusy,0表示无锁状态,1表示被其他线程持有了锁,值为0就使用cas把值更新为1,代表加锁
        • cell对象初始化完成后,释放锁
      • 如果线程竞争标志为false 存在冲突,就把竞争标识改为 true,然后重置当前线程的hash值,重新计算线程的槽位,让线程重新循环参与竞争

      • 如果通过cas操作重新竞争成功,就跳出循环

      • 如果数组的长度 n大于当前cpu的核数,就不可扩容,然后重置当前线程的hash值,让线程重新循环参与竞争

      • 如果cell[] 需要扩容,同样需要拿到cas锁,新数组的长度是原数组的两倍,把原本的cell拷贝到新数组,数组的引用指向新数组后,释放锁

	//x 是需要增加的值,fn 默认是null, wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
	final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean  wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            //数组初始化完成
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //重制竞争标志
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //重新竞争cas操作
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //重置hash值
                h = advanceProbe(h);
            }
            //数组没有初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        //如果cell为null,初始化cell数组,默认长度为2
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //数组正在初始化,作为一种兜底,把值直接累加到base上
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

sum方法

  • 将所有cell数组中的value值和base累加作为返回值
  • 但是再sum执行时,并没有限制对base和cell的更新,所以longAdder 不是强一致性的,而是保证最终一致性
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
文章来源:https://blog.csdn.net/persistence_PSH/article/details/135374118
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。