JUC03共享模型

发布时间:2024年01月16日

网课 : 黑马JUC
笔记 : www.zgtsky.top

内存

Monitor 主要关注的是访问共享变量时,保证临界区代码的原子性。这一章学习共享变量在多线程间的【可见性】问题与多条指令执行时的【有序性】问题

Java 内存模型

JMM 即 Java Memory Model,它从java层面定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。JMM 体现在以下几个方面

  1. 原子性 - 保证指令不会受到线程上下文切换的影响
  2. 可见性 - 保证指令不会受 cpu 缓存的影响
  3. 有序性 - 保证指令不会受 cpu 指令并行优化的影响

可见性

退不出的循环

先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

public class Test1 {
    static boolean run = true;
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(()->{
            while(run){
                // ....
//                System.out.println(2323);  如果加上这个代码就会停下来 里面带有synchronized锁
            }
        });
        t.start();
        utils.sleep(1);
        System.out.println(3434);   
        run = false; // 线程t不会如预想的停下来
    }
}

为什么呢?分析一下:

  1. 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  2. 因为t1线程频繁地从主存中读取run的值,JIT即时编译器会将run的值缓存至自己工作内存中的高速缓存中,减少对主存中run的访问以提高效率

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  3. 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

解决方法

volatile(表示易变关键字的意思),它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存

使用synchronized关键字也有相同的效果!在Java内存模型中,synchronized规定,线程在加锁时, 先清空工作内存→在主内存中拷贝最新变量的副本到工作内存 →执行完代码→将更改后的共享变量的值刷新到主内存中→释放互斥锁

可见性 vs 原子性

前面例子体现的实际就是可见性,它保证的是在多个线程之间一个线程对 volatile 变量的修改对另一个线程可见, 而不能保证原子性,仅用在一个写线程,多个读线程的情况。 上例从字节码理解是这样的:

getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
getstatic run // 线程 t 获取 run true
putstatic run // 线程 main 修改 run 为 false, 仅此一次
getstatic run // 线程 t 获取 run false 

比较一下之前我们将线程安全时举的例子:两个线程一个 i++ 一个 i-- ,只能保证看到最新值,不能解决指令交错

// 假设i的初始值为0
getstatic i // 线程2-获取静态变量i的值 线程内i=0
getstatic i // 线程1-获取静态变量i的值 线程内i=0
iconst_1 // 线程1-准备常量1
iadd // 线程1-自增 线程内i=1
putstatic i // 线程1-将修改后的值存入静态变量i 静态变量i=1
iconst_1 // 线程2-准备常量1
isub // 线程2-自减 线程内i=-1
putstatic i // 线程2-将修改后的值存入静态变量i 静态变量i=-1 

注意 :synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是synchronized 是属于重量级操作,性能相对更低。
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到对 run 变量的修改了,因为println方法里面有synchronized修饰。

模式之两阶段终止

使用volatile关键字来实现两阶段终止模式

import lombok.extern.slf4j.Slf4j;

@Slf4j
/**
 * 使用volatile设置打断标记,实现两阶段终止
 */
public class Test3 {
    public static void main(String[] args) throws InterruptedException {
        TwoParseTermination twoParseTermination = new TwoParseTermination();
        twoParseTermination.start();
        Thread.sleep(3000);  // 让监控线程执行一会儿
        twoParseTermination.stop(); // 停止监控线程

    }
}


@Slf4j
class TwoParseTermination{
    
    Thread thread ;
    
    // 设置打断标记为volatile,解决主线程修改对thread线程的可见性问题
    private volatile boolean stop = false;
    
    
    public void start(){

        thread = new Thread(()->{
            while(true){
                if (stop){
                    log.debug("线程结束。。正在料理后事中");
                    break;
                }
                try {
                    Thread.sleep(500);
                    log.debug("正在执行监控的功能");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
    }
    
    public void stop(){
        this.stop = true;
        // 这里打断是为了防止如果线程在执行Thread.sleep(500);时要等待一定时间才停止
        thread.interrupt();
    }
}

模式之 Balking

  1. 定义:Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。有点类似于单例。
import lombok.extern.slf4j.Slf4j;

@Slf4j
/**
 * 使用volatile设置打断标记,实现两阶段终止,并且设置只可以启动一次,即Balking 模式
 * 可以使用double-checked locking 模式进行过加强
 */
public class Test4 {
    public static void main(String[] args) throws InterruptedException {
        TwoParseTermination2 twoParseTermination = new TwoParseTermination2();
        twoParseTermination.start();
        twoParseTermination.start();
        Thread.sleep(3000);  // 让监控线程执行一会儿
        twoParseTermination.stop(); // 停止监控线程
    }
    
    
}


@Slf4j
class TwoParseTermination2{
    Thread thread ;
    
    // 设置打断标记为volatile,解决可见性问题
    private volatile boolean stop = false;
    
    private volatile boolean starting = false;
    
    public void start(){
        // 如果不加锁,多个线程同时进入if进行判断的话就会出问题,所以要加锁!synchronized也保证了可见性,所以这里的starting没有使用volatile
        synchronized (this){
            if (starting){
                return;
            }
            starting = true; // 如果只在这里使用starting 就无需添加volatile,但在下面非synchronized块中也要使用star
        }
        
        thread = new Thread(()->{
            while(true){
                if (stop){
                    log.debug("线程结束。。正在料理后事中");
                    break;
                }
                try {
                    Thread.sleep(500);
                    log.debug("正在执行监控的功能");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            starting = false;
        });
        stop = false;
        thread.start();
    }
    
    public void stop(){
        this.stop = true;
        // 这里打断是为了防止如果线程在执行Thread.sleep(500);时,要等待一定时间才停止
        thread.interrupt();
    }

}

有序性

诡异的结果

int num = 0;

// volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序
boolean ready = false; 
// 线程1 执行此方法
public void actor1(I_Result r) {
 if(ready) {
 	r.r1 = num + num;
 } 
 else {
 	r.r1 = 1;
 }
}
// 线程2 执行此方法
public void actor2(I_Result r) {
 num = 2;
 ready = true;
}

分别执行上面两个线程

I_Result 是一个对象,有一个属性 r1 用来保存结果,问可能的结果有几种?
情况1:线程1 先执行,这时 ready = false,所以进入 else 分支结果为 1
情况2:线程2 先执行 num = 2,但没来得及执行 ready = true,线程1 执行,还是进入 else 分支,结果为1
情况3:线程2 执行到 ready = true,线程1 执行,这回进入 if 分支,结果为 4(因为 num 已经执行过了)

但我告诉你,结果还有可能是 0 ,信不信吧!这种情况下是:线程2 执行 ready = true,切换到线程1,进入 if 分支,相加为 0,再切回线程2 执行 num = 2。

这种现象叫做指令重排,是 JIT 编译器在运行时的一些优化,这个现象需要通过大量测试才能复现,可以使用jcstress工具进行测试。上面仅是从代码层面体现出了有序性问题,下面在讲到 double-checked locking 问题时还会从java字节码的层面了解有序性的问题。

重排序也需要遵守一定规则:

  1. 重排序操作不会对存在数据依赖关系的操作进行重排序。比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
  2. 重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变。比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系,所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。

重排序在单线程模式下是一定会保证最终结果的正确性,但是在多线程环境下,问题就出来了。解决方法:volatile 修饰的变量,可以禁用指令重排

注意:使用synchronized并不能解决有序性问题,但是如果是该变量整个都在synchronized代码块的保护范围内,那么变量就不会被多个线程同时操作,也不用考虑有序性问题!在这种情况下相当于解决了重排序问题!参考double-checked locking 问题里的代码,第一个代码片段中的instance变量都在synchronized代码块中,第二个代码片段中instance不全在synchronized中所以产生了问题。

volatile 原理

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  1. 对 volatile 变量的写指令后会加入写屏障
  2. 对 volatile 变量的读指令前会加入读屏障
如何保证可见性
  1. 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
public void actor2(I_Result r) {
   num = 2;
   ready = true; // ready是被volatile修饰的 ,赋值带写屏障
   // 写屏障
}

  1. 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

    public void actor1(I_Result r) {
     // 读屏障
     //  ready是被volatile修饰的 ,读取值带读屏障
     if(ready) {
     	r.r1 = num + num;
     } else {
     	r.r1 = 1;
     }
    }
    

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

如何保证有序性
  1. 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后

    public void actor2(I_Result r) {
     num = 2;
     ready = true; //  ready是被volatile修饰的 , 赋值带写屏障
     // 写屏障
    }
    
  2. 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

    public void actor1(I_Result r) {
     // 读屏障
     //  ready是被volatile修饰的 ,读取值带读屏障
     if(ready) {
     	r.r1 = num + num;
     } else {
     	r.r1 = 1;
     }
    }
    

    外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

还是那句话,不能解决指令交错:

  1. 写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去
  2. 而有序性的保证也只是保证了本线程内相关代码不被重排序

1594698671628

double-checked locking 问题

以著名的 double-checked locking 单例模式为例,这是volatile最常使用的地方。

//最开始的单例模式是这样的
    public final class Singleton {
        private Singleton() { }
        private static Singleton INSTANCE = null;
        public static Singleton getInstance() {
        // 首次访问会同步,而之后的使用不用进入synchronized
        synchronized(Singleton.class) {
        	if (INSTANCE == null) { // t1
        		INSTANCE = new Singleton();
            }
        }
            return INSTANCE;
        }
    }
//但是上面的代码块的效率是有问题的,因为即使已经产生了单实例之后,之后调用了getInstance()方法之后还是会加锁,这会严重影响性能!因此就有了模式如下double-checked lockin:
    public final class Singleton {
        private Singleton() { }
        private static Singleton INSTANCE = null;
        public static Singleton getInstance() {
            if(INSTANCE == null) { // t2
                // 首次访问会同步,而之后的使用没有 synchronized
                synchronized(Singleton.class) {
                    if (INSTANCE == null) { // t1
                        INSTANCE = new Singleton();
                    }
                }
            }
            return INSTANCE;
        }
    }
//但是上面的if(INSTANCE == null)判断代码没有在同步代码块synchronized中,不能享有synchronized保证的原子性,可见性。所以

以上的实现特点是:

  1. 懒惰实例化
  2. 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
  3. 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外

但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:

0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
3: ifnonnull 37
// ldc是获得类对象
6: ldc #3 // class cn/itcast/n5/Singleton
// 复制操作数栈栈顶的值放入栈顶, 将类对象的引用地址复制了一份
8: dup
// 操作数栈栈顶的值弹出,即将对象的引用地址存到局部变量表中
// 将类对象的引用地址存储了一份,是为了将来解锁用
9: astore_0
10: monitorenter
11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
14: ifnonnull 27
// 新建一个实例
17: new #3 // class cn/itcast/n5/Singleton
// 复制了一个实例的引用
20: dup
// 通过这个复制的引用调用它的构造方法
21: invokespecial #4 // Method "<init>":()V
// 最开始的这个引用用来进行赋值操作
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
27: aload_0
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
40: areturn

其中

  1. 17 表示创建对象,将对象引用入栈 // new Singleton
  2. 20 表示复制一份对象引用 // 复制了引用地址
  3. 21 表示利用一个对象引用,调用构造方法 // 根据复 制的引用地址调用构造方法
  4. 24 表示利用一个对象引用,赋值给 static INSTANCE

也许 jvm 会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:

1594701748458

关键在于 0: getstatic 这行代码在 monitor 控制之外,它就像之前举例中不守规则的人,可以越过 monitor 读取
INSTANCE 变量的值
这时 t1 还未完全将构造方法执行完毕,如果在构造方法中要执行很多初始化操作,那么 t2 拿到的是将是一个未初
始化完毕的单例
对 INSTANCE 使用 volatile 修饰即可,可以禁用指令重排,但要注意在 JDK 5 以上的版本的 volatile 才会真正有效

double-checked locking 解决

加volatile就行了

    public final class Singleton {
        private Singleton() { }
        private static volatile Singleton INSTANCE = null;
        public static Singleton getInstance() {
            // 实例没创建,才会进入内部的 synchronized代码块
            if (INSTANCE == null) {
                synchronized (Singleton.class) { // t2
                    // 也许有其它线程已经创建实例,所以再判断一次
                    if (INSTANCE == null) { // t1
                        INSTANCE = new Singleton();
                    }
                }
            }
            return INSTANCE;
        }
    }

字节码上看不出来 volatile 指令的效果

// -------------------------------------> 加入对 INSTANCE 变量的读屏障
0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
3: ifnonnull 37
6: ldc #3 // class cn/itcast/n5/Singleton
8: dup
9: astore_0
10: monitorenter -----------------------> 保证原子性、可见性
11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
14: ifnonnull 27
17: new #3 // class cn/itcast/n5/Singleton
20: dup
21: invokespecial #4 // Method "<init>":()V
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
// -------------------------------------> 加入对 INSTANCE 变量的写屏障
27: aload_0
28: monitorexit ------------------------> 保证原子性、可见性
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
40: areturn

如上面的注释内容所示,读写 volatile 变量操作(即getstatic操作和putstatic操作)时会加入内存屏障(Memory Barrier(Memory Fence)),保证下面两点:

  1. 可见性
    1. 写屏障(sfence)保证在该屏障之前的 t1 对共享变量的改动,都同步到主存当中
    2. 而读屏障(lfence)保证在该屏障之后 t2 对共享变量的读取,加载的是主存中最新数据
  2. 有序性
    1. 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
    2. 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
  3. 更底层是读写变量时使用 lock 指令来多核 CPU 之间的可见性与有序性

1594703228878

happens-before

下面说的变量都是指成员变量或静态成员变量

  1. 线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见
            static int x;
            static Object m = new Object();
            new Thread(()->{
                synchronized(m) {
                    x = 10;
                }
            },"t1").start();
            new Thread(()->{
                synchronized(m) {
                    System.out.println(x);
                }
            },"t2").start();
             
  1. 线程对 volatile 变量的写,对接下来其它线程对该变量的读可见
        volatile static int x;
        new Thread(()->{
         x = 10;
        },"t1").start();
        new Thread(()->{
         System.out.println(x);
        },"t2").start();
  1. 线程 start 前对变量的写,对该线程开始后对该变量的读可见
        static int x;
        x = 10;
        new Thread(()->{
         System.out.println(x);
        },"t2").start();
  1. 线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束)
        static int x;
        Thread t1 = new Thread(()->{
         x = 10;
        },"t1");
        t1.start();
        t1.join();
        System.out.println(x);
  1. 线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过
    t2.interrupted 或 t2.isInterrupted)
            
            static int x;
            public static void main(String[] args) {
                Thread t2 = new Thread(()->{
                    while(true) {
                        if(Thread.currentThread().isInterrupted()) {
                            System.out.println(x);
                            break;
                        }
                    }
                },"t2");
                t2.start();
                new Thread(()->{
                    sleep(1);
                    x = 10;
                    t2.interrupt();
                },"t1").start();
                while(!t2.isInterrupted()) {
                    Thread.yield();
                }
                System.out.println(x);
           }
           
  1. 对变量默认值(0,false,null)的写,对其它线程对该变量的读可见

  2. 具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排,有下面的例子

             volatile static int x;
                  static int y;
                  new Thread(()->{
                       y = 10;
                       x = 20;
                  },"t1").start();
                  new Thread(()->{
                      // x=20 对 t2 可见, 同时 y=10 也对 t2 可见
                      System.out.println(x);
                   },"t2").start();

总结

volatile主要用在一个线程改多个线程读时的来保证可见性,和double-checked locking模式中保证synchronized代码块外的共享变量的重排序问题

习题

balking 模式习题

希望 doInit() 方法仅被调用一次,下面的实现是否有问题,为什么?

public class TestVolatile {
    volatile boolean initialized = false;
    void init() {
        if (initialized) {
            return;
        }
        doInit();
        initialized = true;
    }
    private void doInit() {
    }
} 
线程安全单例习题

单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用
getInstance)时的线程安全,并思考注释中的问题
饿汉式:类加载就会导致该单实例对象被创建
懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建

实现1: 饿汉式

// 问题1:为什么加 final,防止子类继承后更改
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例,如果进行反序列化的时候会生成新的对象,这样跟单例模式生成的对象是不同的。要解决直接加上readResolve()方法就行了,如下所示
public final class Singleton implements Serializable {
    // 问题3:为什么设置为私有? 放弃其它类中使用new生成新的实例,是否能防止反射创建新的实例?不能。
    private Singleton() {}
    // 问题4:这样初始化是否能保证单例对象创建时的线程安全?没有,这是类变量,是jvm在类加载阶段就进行了初始化,jvm保证了此操作的线程安全性
    private static final Singleton INSTANCE = new Singleton();
    // 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由。
    //1.提供更好的封装性;2.提供范型的支持
    public static Singleton getInstance() {
        return INSTANCE;
    }
    public Object readResolve() {
        return INSTANCE;
    }
}

实现2: 饿汉式

// 问题1:枚举单例是如何限制实例个数的:创建枚举类的时候就已经定义好了,每个枚举常量其实就是枚举类的一个静态成员变量
// 问题2:枚举单例在创建时是否有并发问题:没有,这是静态成员变量
// 问题3:枚举单例能否被反射破坏单例:不能
// 问题4:枚举单例能否被反序列化破坏单例:枚举类默认实现了序列化接口,枚举类已经考虑到此问题,无需担心破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式:饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做:加构造方法就行了
enum Singleton {
 INSTANCE;
}

实现3:懒汉式

public final class Singleton {
    private Singleton() { }
    private static Singleton INSTANCE = null;
    // 分析这里的线程安全, 并说明有什么缺点:synchronized加载静态方法上,可以保证线程安全。缺点就是锁的范围过大
    public static synchronized Singleton getInstance() {
        if( INSTANCE != null ){
            return INSTANCE;
        }
        INSTANCE = new Singleton();
        return INSTANCE;
    }
}

实现4:DCL 懒汉式

public final class Singleton {
    private Singleton() { }
    // 问题1:解释为什么要加 volatile ?为了防止重排序问题
    private static volatile Singleton INSTANCE = null;

    // 问题2:对比实现3, 说出这样做的意义:提高了效率
    public static Singleton getInstance() {
        if (INSTANCE != null) {
            return INSTANCE;
        }
        synchronized (Singleton.class) {
            // 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗?这是为了第一次判断时的并发问题。
            if (INSTANCE != null) { // t2
                return INSTANCE;
            }
            INSTANCE = new Singleton();
            return INSTANCE;
        }
    }
}

实现5:

public final class Singleton {
    private Singleton() { }
    // 问题1:属于懒汉式还是饿汉式:懒汉式,这是一个静态内部类。类加载本身就是懒惰的,在没有调用getInstance方法时是没有执行LazyHolder内部类的类加载操作的。
    private static class LazyHolder {
        static final Singleton INSTANCE = new Singleton();
    }
    // 问题2:在创建时是否有并发问题,这是线程安全的,类加载时,jvm保证类加载操作的线程安全
    public static Singleton getInstance() {
        return LazyHolder.INSTANCE;
    }
}

本章小结

本章重点讲解了 JMM 中的

  1. 可见性 - 由 JVM 缓存优化引起
  2. 有序性 - 由 JVM 指令重排序优化引起
  3. happens-before 规则
  4. 原理方面
    1. volatile
  5. 模式方面
    1. 两阶段终止模式的 volatile 改进
    2. 同步模式之 balking

程即monitor是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制

无锁

问题提出

有如下需求,保证account.withdraw取款方法的线程安全

public class Test5 {

    public static void main(String[] args) {
        Account.demo(new AccountUnsafe(10000));
    }
}

class AccountUnsafe implements Account {
    private Integer balance;
    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }
    @Override
    public Integer getBalance() {
        return balance;
    }
    @Override
    
    public void  withdraw(Integer amount) {
        // 通过这里加锁就可以实现线程安全,不加就会导致结果异常
        synchronized (this){
            balance -= amount;
        }
    }
}


interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

解决思路-无锁

上面的代码中可以使用synchronized加锁操作来实现线程安全,但是synchronized加锁操作太耗费资源,这里我们使用无锁来解决此问题:

class AccountSafe implements Account{

    AtomicInteger atomicInteger ;
    
    public AccountSafe(Integer balance){
        this.atomicInteger =  new AtomicInteger(balance);
    }
    
    @Override
    public Integer getBalance() {
        return atomicInteger.get();
    }

    @Override
    public void withdraw(Integer amount) {
        // 核心代码
        while (true){
            int pre = getBalance();
            int next = pre - amount;
            if (atomicInteger.compareAndSet(pre,next)){
                break;
            }
        }
        // 可以简化为下面的方法
        // balance.addAndGet(-1 * amount);
    }
}

CAS 与 volatile

cas

前面看到的AtomicInteger的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

    @Override
    public void withdraw(Integer amount) {
        // 核心代码
        // 需要不断尝试,直到成功为止
        while (true){
            // 比如拿到了旧值 1000
            int pre = getBalance();
            // 在这个基础上 1000-10 = 990
            int next = pre - amount;
            /*
             compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
             - 不一致了,next 作废,返回 false 表示失败
             比如,别的线程已经做了减法,当前值已经被减成了 990
             那么本线程的这次 990 就作废了,进入 while 下次循环重试
             - 一致,以 next 设置为新值,返回 true 表示成功
			 */
            if (atomicInteger.compareAndSet(pre,next)){
                break;
            }
        }
    }

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

volatile

在上面代码中的AtomicInteger,保存值的value属性使用了volatile 。获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取
它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

再提一嘴
volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原
子性)

CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

为什么无锁效率高

  1. 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  2. 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  1. CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  2. synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  3. CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    1. 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    2. 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

原子整数

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:

  1. 使用原子的方式更新基本类型

    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类

    上面三个类提供的方法几乎相同,所以我们将以 AtomicInteger 为例子来介绍。

  2. 原子引用

  3. 原子数组

  4. 字段更新器

  5. 原子累加器

下面先讨论原子整数类,以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的!!

    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);
        // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
        System.out.println(i.getAndIncrement());
        // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
        System.out.println(i.incrementAndGet());
        // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
        System.out.println(i.decrementAndGet());
        // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
        System.out.println(i.getAndDecrement());
        // 获取并加值(i = 0, 结果 i = 5, 返回 0)
        System.out.println(i.getAndAdd(5));
        // 加值并获取(i = 5, 结果 i = 0, 返回 0)
        System.out.println(i.addAndGet(-5));
        // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.getAndUpdate(p -> p - 2));
        // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.updateAndGet(p -> p + 2));
        // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
        // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
        System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
        // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
    }

原子引用

为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

使用原子引用实现BigDecimal存取款的线程安全:

下面这个是不安全的实现过程:

class DecimalAccountUnsafe implements DecimalAccount {
    BigDecimal balance;
    public DecimalAccountUnsafe(BigDecimal balance) {
        this.balance = balance;
    }
    @Override
    public BigDecimal getBalance() {
        return balance;
    }
    @Override
    public void withdraw(BigDecimal amount) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

解决代码如下:在AtomicReference类中,存在一个value类型的变量,保存对BigDecimal对象的引用。

class DecimalAccountCas implements DecimalAccount{

    //private BigDecimal balance;
    private AtomicReference<BigDecimal> balance ;

    public DecimalAccountCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while(true){
            BigDecimal pre = balance.get();
            // 注意:这里的balance返回的是一个新的对象,即 pre!=next
            BigDecimal next = pre.subtract(amount);
            if (balance.compareAndSet(pre,next)){
                break;
            }
        }
    }
}

ABA 问题及解决

ABA 问题: 如下程序所示,虽然再other方法中存在两个线程对共享变量进行了修改,但是修改之后又变成了原值,main线程中对此是不可见得,这种操作这对业务代码并无影响。

    static AtomicReference<String> ref = new AtomicReference<>("A");
    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A
        // 这个共享变量被它线程修改
        String prev = ref.get();
        other();
        utils.sleep(1);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
    }
    private static void other() {
        new Thread(() -> {
            log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
        }, "t1").start();
        utils.sleep(1);
        new Thread(() -> {
            // 注意:如果这里使用  log.debug("change B->A {}", ref.compareAndSet(ref.get(), new String("A")));
            // 那么此实验中的 log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
            // 打印的就是false, 因为new String("A") 返回的对象的引用和"A"返回的对象的引用时不同的!
            log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
        }, "t2").start();
    }

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。

AtomicStampedReference

解决ABA问题

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
    public static void main(String[] args) throws InterruptedException {
        log.debug("main start...");
        // 获取值 A 
        int stamp = ref.getStamp();
        log.info("{}",stamp);
        String prev = ref.getReference();
        other();
        utils.sleep(1);
        // 尝试改为 C
        log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1));
    }
    
    private static void other() {
        new Thread(() -> {
            int stamp = ref.getStamp();
            log.info("{}",stamp);
            log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1));
        }, "t1").start();
        utils.sleep(1);
        new Thread(() -> {
            int stamp = ref.getStamp();
            log.info("{}",stamp);
            log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1));
        }, "t2").start();
    }

AtomicMarkableReference

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

原子数组

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray :引用类型数组原子类

上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。实例代码:

我们将使用函数式编程来实现,先看看一些函数式编程的接口的javadoc文档

Represents a supplier of results.
表示supplier的结果。
There is no requirement that a new or distinct result be returned each time the supplier is invoked.
不要求每次调用供应商时都返回一个新的或不同的结果。
This is a functional interface whose functional method is get().
这是一个函数接口,其函数方法是get()。
public interface Supplier<T> {
    /**
     * Gets a result.
     * @return a result
     */
    T get();
}

Represents a function that accepts one argument and produces a result.
表示接受一个参数并生成结果的函数。
This is a functional interface whose functional method is apply(Object).
这是一个函数接口,其函数方法是apply(Object)。
public interface Function<T, R> {
/**
* Applies this function to the given argument.
* @param t the function argument
* @return the function result
*/	
  R apply(T t);
  //....
  }
  
  Represents an operation that accepts two input arguments and returns no result. This is the two-arity specialization of Consumer. Unlike most other functional interfaces, BiConsumer is expected to operate via side-effects.
表示接受两个输入参数且不返回结果的操作。这就是Consumer的二元参数版本。与大多数其他功能性接口不同,BiConsumer期望执行带有副作用的操作。
This is a functional interface whose functional method is accept(Object, Object).
这是一个函数接口,其函数方法是accept(ObjectObject)。
public interface BiConsumer<T, U> {
   void accept(T t, U u);
     //....
  }



Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects.
表示接受单个输入参数但不返回结果的操作。与大多数其他功能接口不同,消费者期望执行带有副作用的操作。 
public interface Consumer<T> {
void accept(T t);
   //....
  }
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class Test11 {

    public static void main(String[] args) {
        Test11.demo(
                ()->new AtomicIntegerArray(10),
                (array)-> array.length(),
                (array,index)-> array.getAndIncrement(index),
                (array)->System.out.println(array)
        );

        Test11.demo(
                ()->new int[10],
                (array)-> array.length,
                (array,index)-> array[index]++,
                (array)->System.out.println(Arrays.toString(array))
        );
    }

    /**
     参数1,提供数组、可以是线程不安全数组或线程安全数组
     参数2,获取数组长度的方法
     参数3,自增方法,传 array, index两个参数   array为数组,index为数组元素每次自增的元素的下标
     参数4,打印数组的方法
     */
    // supplier 提供者 无中生有 ()->结果
    // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
    // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
    public static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T,Integer> lengthFun,
            BiConsumer<T,Integer> putConsumer,
            Consumer<T> printConsumer
    ){
        List<Thread> ts = new ArrayList<>();
        T array = arraySupplier.get();
        Integer length = lengthFun.apply(array);
        for (int i = 0;i<length;i++){
            ts.add(new Thread(()->{
                for (int j=0;j<10000;j++){
                    putConsumer.accept(array,j%length);
                }
            }));
        }
        ts.forEach(t->t.start());
        ts.forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }    
}

字段更新器

AtomicReferenceFieldUpdater // 域 字段 ,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater

注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
package com.concurrent.test2;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


/**
 * 原子字段AtomicReferenceFieldUpdater 的使用
 */
public class Test12 {

    private volatile int field;
    public static void main(String[] args) {
        AtomicReferenceFieldUpdater fieldUpdater =
                AtomicReferenceFieldUpdater.newUpdater(Testx.class, String.class,"field");
        Testx testx = new Testx();

        fieldUpdater.compareAndSet(testx, "3434", "李华");
        // 修改成功 field = "李华"
        System.out.println(testx.field);
        // 修改失败 field = "李华"
        fieldUpdater.compareAndSet(testx, null, "zhang华");
        System.out.println(testx.field);



    }
}

class Testx{
    volatile String field;

    @Override
    public String toString() {
        return "Testx{" +
                "field='" + field + '\'' +
                '}';
    }
}

原子累加器

累加器性能比较

LongAdder累加器的使用

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            demo(() -> new LongAdder(), adder -> adder.increment());
        }
        for (int i = 0; i < 5; i++) {
            demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
        }

    }
    
    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 4 个线程,每人累加 50 万
        for (int i = 0; i < 40; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start)/1000_000);
    }

性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

源码之 LongAdder (还没搞太清楚)

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
cas 锁

使用cas实现一个自旋锁

// 不要用于生产实践!!!
public class LockCas {
    private AtomicInteger state = new AtomicInteger(0);
    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }
    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }
}

测试

        LockCas lock = new LockCas();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
                sleep(1);
            } finally {
                lock.unlock();
            }
        }).start();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
            } finally {
                lock.unlock();
            }
        }).start();
原理之伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
    // 省略不重要代码
}

下面讨论@sun.misc.Contended注解的重要意义

得从缓存说起,缓存与内存的速度比较

1594821128208

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离cpu越近速度越快。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。

1594821188387

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加
Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效,@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

1594821225708

再来看看LongAdder类的累加increment()方法中又主要调用下面的方法

  public void add(long x) {
        // as 为累加单元数组
        // b 为基础值
        // x 为累加值
        Cell[] as; long b, v; int m; Cell a;
        // 进入 if 的两个条件
        // 1. as 有值, 表示已经发生过竞争, 进入 if
        // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            // uncontended 表示 cell 没有竞争
            boolean uncontended = true;
            if (
                // as 还没有创建
                    as == null || (m = as.length - 1) < 0 ||
                            // 当前线程对应的 cell 还没有被创建,a为当线程的cell
                            (a = as[getProbe() & m]) == null ||
       // 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
                            !(uncontended = a.cas(v = a.value, v + x))
            ) {
                // 进入 cell 数组创建、cell 创建的流程
                longAccumulate(x, null, uncontended);
            }
        }
    }
add 方法分析

add 流程图

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell
        if ((h = getProbe()) == 0) {
            // 初始化 probe
            ThreadLocalRandom.current();
            // h 对应新的 probe 值, 用来对应 cell
            h = getProbe();
            wasUncontended = true;
        }
        // collide 为 true 表示需要扩容
        boolean collide = false;
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            // 已经有了 cells
            if ((as = cells) != null && (n = as.length) > 0) {
                // 但是还没有当前线程对应的 cell
                if ((a = as[(n - 1) & h]) == null) {
                    // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x
                    // 成功则 break, 否则继续 continue 循环
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    // 判断槽位确实是空的
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                }
                // 有竞争, 改变线程对应的 cell 来重试 cas
                else if (!wasUncontended)
                    wasUncontended = true;
                    // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null
                else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                    break;
                    // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas
                else if (n >= NCPU || cells != as)
                    collide = false;
                    // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了
                else if (!collide)
                    collide = true;
                    // 加锁
                else if (cellsBusy == 0 && casCellsBusy()) {
                    // 加锁成功, 扩容
                    continue;
                }
                // 改变线程对应的 cell
                h = advanceProbe(h);
            }
            // 还没有 cells, cells==as是指没有其它线程修改cells,as和cells引用相同的对象,使用casCellsBusy()尝试给 cellsBusy 加锁
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell
                // 成功则 break;
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            // 上两种情况失败, 尝试给 base 使用casBase累加
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;
        }
    }

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

上图中的第一个else if 中代码的逻辑,这是cells未创建时的处理逻辑。

1594825327345

上图中的if 中代码的逻辑,里面包含线程对应的cell已经创建好和没创建好的两种情况。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

线程对应的cell还没创建好,则执行的是第一个红框里的代码,逻辑如下

1594826082009

线程对应的cell已经创建好,则执行的是第二个红框里的代码,逻辑如下

1594826936970

sum 方法分析

获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果。

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

Unsafe

概述

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport的park方法,cas相关的方法底层都是通过Unsafe类来实现的。

  
static Unsafe unsafe;
    static {
        try {
            // Unsafe 使用了单例模式,unsafe对象是类中的一个私有的变量
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
        return unsafe;
    }

Unsafe CAS 操作

  1. 使用Unsafe 进行cas操作:

    package com.concurrent.test2;
    
    import sun.misc.Unsafe;
    
    import java.lang.reflect.Field;
    
    public class Test15 {
        public static void main(String[] args) throws NoSuchFieldException {
    	
            Unsafe unsafe = Test14.getUnsafe(); // 上面获取unsafe的类方法
            Field id = Student.class.getDeclaredField("id");
            Field name = Student.class.getDeclaredField("name");
            // 1.获得成员变量的偏移量
            long idOffset = Test14.unsafe.objectFieldOffset(id);
            long nameOffset = Test14.unsafe.objectFieldOffset(name);
            Student student = new Student();
            // 2.使用 cas 方法替换成员变量的值
            Test14.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
            Test14.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
            
            //3.验证
            System.out.println(student);
        }
    }
    
    
    class Student {
        volatile int id;
        volatile String name;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        @Override
        public String toString() {
            return "Student{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    
  2. 使用自定义的 AtomicData 实现之前线程安全的原子整数 ,并用之前的取款实例来进行验证

package com.concurrent.test2;

import sun.misc.Unsafe;

public class Test16 implements Account{

    public static void main(String[] args) {
        
        Account.demo(new Test16(10000));
    }
    
    
    private volatile int value;

    private static final Unsafe unsafe;
    
    private static long valueOffset;

    public Test16(int value) {
        this.value = value;
    }

    static {
        unsafe = Test14.getUnsafe();
        try {
            valueOffset = unsafe.objectFieldOffset(Test16.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }
    
    public int getValue(){
        return this.value;
    }
    
    public void decrement(int amount){
        while (true){
            int pre = getValue();
            int next = pre - amount;
            if (unsafe.compareAndSwapInt(this,valueOffset,pre,next)){
                break;
            }
        }
    }

    @Override
    public Integer getBalance() {
        return getValue();
    }

    @Override
    public void withdraw(Integer amount) {
        decrement(amount);
    }
}

总结

  1. CAS 与 volatile
  2. juc包下API
    1. 原子整数
    2. 原子引用
    3. 原子数组
    4. 字段更新器
    5. 原子累加器
  3. Unsafe
  4. 原理方面
    1. LongAdder 源码
    2. 伪共享

不可变

日期转换的问题

问题提出,下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的,有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果。

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    log.debug("{}", sdf.parse("1951-04-21"));
                } catch (Exception e) {
                    log.error("{}", e);
                }
            }).start();
        }

思路 - 不可变对象

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在
Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类DateTimeFormatter:

        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                LocalDate date = dtf.parse("2018-10-01", LocalDate::from);
                log.debug("{}", date);
            }).start();
        }

不可变设计

  1. 更多不可变类的知识,可参考这这里
  2. final类的知识,参考这里

另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变类设计的要素

public final class String
    implements java.io.Serializable, Comparable<String>, CharSequence {
    /** The value is used for character storage. */
    private final char value[];
    /** Cache the hash code for the string */
    private int hash; // Default to 0
    // ...
}

final 的使用

发现该类、类中所有属性都是 final 的,属性用 final 修饰保证了该属性是只读的,不能修改,类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性。

保护性拷贝

但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是
如何实现的,就以 substring 为例:

    public String substring(int beginIndex, int endIndex) {
        if (beginIndex < 0) {
            throw new StringIndexOutOfBoundsException(beginIndex);
        }
        if (endIndex > value.length) {
            throw new StringIndexOutOfBoundsException(endIndex);
        }
        int subLen = endIndex - beginIndex;
        if (subLen < 0) {
            throw new StringIndexOutOfBoundsException(subLen);
        }
        // 上面是一些校验,下面才是真正的创建新的String对象
        return ((beginIndex == 0) && (endIndex == value.length)) ? this
                : new String(value, beginIndex, subLen);
    }

发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出了修改:结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】

    public String(char value[], int offset, int count) {
        if (offset < 0) {
            throw new StringIndexOutOfBoundsException(offset);
        }
        if (count <= 0) {
            if (count < 0) {
                throw new StringIndexOutOfBoundsException(count);
            }
            if (offset <= value.length) {
                this.value = "".value;
                return;
            }
        }
        // Note: offset or count might be near -1>>>1.
        if (offset > value.length - count) {
            throw new StringIndexOutOfBoundsException(offset + count);
        }
        // 上面是一些安全性的校验,下面是给String对象的value赋值,新创建了一个数组来保存String对象的值
        this.value = Arrays.copyOfRange(value, offset, offset+count);
    }

模式之享元

  1. 简介定义英文名称:Flyweight pattern. 当需要重用数量有限的同一类对象时

  2. 体现

    1. 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:

      public static Long valueOf(long l) {
       final int offset = 128;
       if (l >= -128 && l <= 127) { // will cache
       return LongCache.cache[(int)l + offset];
       }
       return new Long(l);
      }
      
      

      注意:
      Byte, Short, Long 缓存的范围都是 -128~127
      Character 缓存的范围是 0~127
      Integer的默认范围是 -128~127,最小值不能变,但最大值可以通过调整虚拟机参数 "-Djava.lang.Integer.IntegerCache.high "来改变
      Boolean 缓存了 TRUE 和 FALSE

    2. String 串池

    3. BigDecimal BigInteger

  3. diy:例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。

package com.concurrent.test2;


import com.concurrent.test.Test30;
import com.concurrent.test.utils;
import com.sun.xml.internal.ws.util.Pool;
import lombok.extern.slf4j.Slf4j;

import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;


@Slf4j
/**
 * 以下实现没有考虑:
 * 连接的动态增长与收缩
 * 连接保活(可用性检测)
 * 等待超时处理
 * 分布式 hash
 * 对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache
 * commons pool,例如redis连接池可以参考jedis中关于连接池的实现
 */
public class Test17 {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;

    public static void main(String[] args) {
        Test17 pools = new Test17(2);
        for (int i=0;i<5;i++){
            new Thread(()->{
                Connection borrow = pools.borrow();
                utils.sleep(1);
                pools.free(borrow);
            },"xiancheng " + i).start();
        }
    }


    // 4. 构造方法初始化
    public Test17(int poolSize){
        this.poolSize = poolSize;
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i=0;i<poolSize;i++){
            connections[i] = new MockConnection("连接"+(i+1));
        }
    }

    // 5. 借连接
    public Connection borrow(){
        while (true){
            for (int i=0;i<poolSize;i++){
                // 获取空闲的连接
               if (states.get(i)==0){
                   // 使用cas
                   if (states.compareAndSet(i,0,1)){
                       log.info("使用了连接{}",connections[i]);
                       return connections[i];
                   }
               }
            }

            // 没有空闲连接的时候则使用synchronized进行等待,不适用cas的原因是这个等待的时间可能比较长,使用cas比较耗cpu性能
            synchronized (this){
                try {
                    log.info("正在等待");
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }



    }


    // 6. 归还连接
    public void free(Connection connection){
        for (int i=0;i<poolSize;i++){
            if (connections[i] == connection){
                states.set(i,0);
                // 上面是没必要加锁的,因为不存在线程竞争,只有占有连接的线程才能归还连接
                synchronized (this){
                    log.info("释放了连接 {}",connection);
                    notifyAll();
                }
                break;
            }
        }
    }
}

class MockConnection implements Connection{

    public MockConnection(String name){

    }

    @Override
    public Statement createStatement() throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {

    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }

    @Override
    public void commit() throws SQLException {

    }

    @Override
    public void rollback() throws SQLException {

    }

    @Override
    public void close() throws SQLException {

    }

    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {

    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {

    }

    @Override
    public String getCatalog() throws SQLException {
        return null;
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {

    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }

    @Override
    public void clearWarnings() throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {

    }

    @Override
    public void setHoldability(int holdability) throws SQLException {

    }

    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {

    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {

    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }

    @Override
    public Clob createClob() throws SQLException {
        return null;
    }

    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }

    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {

    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {

    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }

    @Override
    public void setSchema(String schema) throws SQLException {

    }

    @Override
    public String getSchema() throws SQLException {
        return null;
    }

    @Override
    public void abort(Executor executor) throws SQLException {

    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {

    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}

final的原理

  1. 设置 final 变量的原理

    1. 理解了 volatile 原理,再对比 final 的实现就比较简单了

      public class TestFinal {final int a=20;}
      

      字节码

      0: aload_0
      1: invokespecial #1 // Method java/lang/Object."<init>":()V
      4: aload_0
      5: bipush 20
      7: putfield #2 // Field a:I
       <-- 写屏障
      10: return
      
      
    2. final变量的赋值操作都必须在定义时或者构造器中进行初始化赋值,并发现 final 变量的赋值也会通过 putfield 指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况。

  2. 获取 final 变量的原理:从字节码的层面去理解 视频

本章小结

  1. 不可变类使用
  2. 不可变类设计
  3. 原理方面:final
  4. 模式方面
    1. 享元模式-> 设置线程池
文章来源:https://blog.csdn.net/qq_44802369/article/details/135623947
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。