Java:多线程和JUC详解

发布时间:2024年01月06日

文章目录

多线程 (Multithreading): 这是一种编程技术,允许程序同时执行多个线程。每个线程都是一个独立的执行单元,它们可以并行或并发执行。

JUC (Java Util Concurrent): 这是Java提供的用于并发编程的工具包。JUC包含了一系列的工具类、线程池、原子变量类等,用于简化并发编程,提高性能,以及更好地处理并发和同步问题。一些JUC中常用的类包括ExecutorServiceThreadPoolExecutorLockSemaphore等。

多线程是一种编程模型,而JUC是Java提供的一组工具和类,用于更方便、更安全地进行并发编程。在实际开发中,JUC常用于解决多线程编程中的共享资源管理、同步和线程安全等问题。多线程和JUC区别如下:

  • 多线程是JUC使用和依赖的基础,提供了通过Thread和Runnable来并行执行任务的能力,UC中的同步组件如锁等都需要在多线程环境中运行。
  • JUC建立在多线程的基础上,利用多线程能够并发运行,提供了更高级更先进的同步组件和数据结构来更好地管理线程并发。
  • 但是多线程和JUC具体实现上是互相独立的。例如Thread类不属于JUC,JUC中的同步类如ReentrantLock也不属于标准的java.lang.Thread包。
  • 从概念角度,多线程提供的是一个能力 - 并发执行,而JUC提供的是一个更高层的解决方案 - 更好的同步规则。

未说明的代码基于JDK11编写。

1、实现多线程

1.1 多线程概念

是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多个线程,提升性能。

1.2 并发和并行

  • 并行:在同一时刻,有多个指令在多个CPU上同时执行。

  • 并发:在同一时刻,有多个指令在单个CPU上交替执行。

1.3 进程和线程

  • 进程:是正在运行的程序

    独立性:进程是一个能独立运行的基本单位,同时也是系统分配资源和调度的独立单位;
    动态性:进程的实质是程序的一次执行过程,进程是动态产生,动态消亡的;
    并发性:任何进程都可以同其他进程一起并发执行。

  • 线程:是进程中的单个顺序控制流,是一条执行路径

    单线程:一个进程如果只有一条执行路径,则称为单线程程序。
    多线程:一个进程如果有多条执行路径,则称为多线程程序。

1.4 多线程实现方式

1.4.1 继承Thread类

(1)方法介绍

方法名说明
void run()在线程开启后,此方法将被调用执行
void start()使此线程开始执行,Java虚拟机会调用run方法()

(2)实现步骤

  • 定义一个类MyThread继承Thread类
  • 在MyThread类中重写run()方法
  • 创建MyThread类的对象
  • 启动线程

(3)代码演示

public class MyThread extends Thread{
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(getName()+": Hello,World");
        }
    }
}
public class MyThreadDemo {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();
        t1.setName("线程1");
        t2.setName("线程2");

        // 启动线程
        t1.start();
        t2.start();
    }
}

image-20231228143401945

(4)两个小问题

  • 为什么要重写run()方法?

    因为run()是用来封装被线程执行的代码

  • run()方法和start()方法的区别?

    run():封装线程执行的代码,直接调用,相当于普通方法的调用

    start():启动线程;然后由JVM调用此线程的run()方法

1.4.2 实现Runnable接口

(1)Thread构造方法

方法名说明
Thread(Runnable target)分配一个新的Thread对象
Thread(Runnable target, String name)分配一个新的Thread对象

(2)实现步骤

  • 定义一个类MyRunnable实现Runnable接口
  • 在MyRunnable类中重写run()方法
  • 创建MyRunnable类的对象
  • 创建Thread类的对象,把MyRunnable对象作为构造方法的参数
  • 启动线程

(3)代码演示

public class MyRunnable implements Runnable{

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            //获取到当前线程的对象
            /*Thread t = Thread.currentThread();
            System.out.println(t.getName() + "HelloWorld!");*/
            System.out.println(Thread.currentThread().getName() + ":HelloWorld!");
        }
    }
}
public class MyRunnableDemo {
    public static void main(String[] args) {
        //创建MyThread的对象
        //表示多线程要执行的任务
        MyRunnable myThread = new MyRunnable();

        //创建Thread类的对象,把MyRunnable对象作为构造方法的参数
        /*Thread t1 = new Thread(myThread);
        Thread t2 = new Thread(myThread);
        //给线程设置名字
        t1.setName("线程1");
        t2.setName("线程2");*/


        // Thread(Runnable target, String name)方法
        Thread t1 = new Thread(myThread,"线程1");
        Thread t2 = new Thread(myThread,"线程2");

        //开启线程
        t1.start();
        t2.start();
    }
}

image-20231228144830477

1.4.3 实现Callable接口

(1)方法介绍

方法名说明
V call()计算结果,如果无法计算结果,则抛出一个异常
FutureTask(Callable callable)创建一个 FutureTask,一旦运行就执行给定的 Callable
V get()如有必要,等待计算完成,然后获取其结果

(2)实现步骤

  • 定义一个类MyCallable实现Callable接口
  • 在MyCallable类中重写call()方法
  • 创建MyCallable类的对象
  • 创建Future的实现类FutureTask对象,把MyCallable对象作为构造方法的参数
  • 创建Thread类的对象,把FutureTask对象作为构造方法的参数
  • 启动线程
  • 再调用get方法,就可以获取线程结束之后的结果。

(3)代码演示

多线程的第三种实现方式:
    特点:可以获取到多线程运行的结果
    1. 创建一个类MyCallable实现Callable接口
    2. 重写call (是有返回值的,表示多线程运行的结果)
    3. 创建MyCallable的对象(表示多线程要执行的任务)
    4. 创建FutureTask的对象(作用管理多线程运行的结果)
    5. 创建Thread类的对象,并启动(表示线程)
public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        //求1~100之间的和
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum = sum + i;
        }
        return sum;
    }
}
public class MyCallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建MyCallable的对象(表示多线程要执行的任务)
        MyCallable myCallable = new MyCallable();
        //创建FutureTask的对象(作用管理多线程运行的结果)
        FutureTask<Integer> ft = new FutureTask<>(myCallable);
        //创建线程的对象
        Thread t1 = new Thread(ft);
        //启动线程
        t1.start();

        //获取多线程运行的结果
        Integer result = ft.get();
        System.out.println(result);
    }
}

image-20231228150236733

1.4.4 三种实现方式的对比
  • 实现Runnable、Callable接口
    • 好处: 扩展性强,实现该接口的同时还可以继承其他的类
    • 缺点: 编程相对复杂,不能直接使用Thread类中的方法
  • 继承Thread类
    • 好处: 编程比较简单,可以直接使用Thread类中的方法
    • 缺点: 可以扩展性较差,不能再继承其他的类

1.5 设置和获取线程名称

(1)方法介绍

方法名说明
void setName(String name)将此线程的名称更改为等于参数name
String getName()返回此线程的名称
Thread currentThread()返回对当前正在执行的线程对象的引用

(2)代码演示

String getName()                    返回此线程的名称

void setName(String name)           设置线程的名字(构造方法也可以设置名字)
细节:
    1、如果没有给线程设置名字,线程也是有默认的名字的
            格式:Thread-X(X序号,从0开始的)
    2、如果要给线程设置名字,可以用set方法进行设置,也可以构造方法设置

static Thread currentThread()       获取当前线程的对象
细节:
    当JVM虚拟机启动之后,会自动的启动多条线程
    其中有一条线程就叫做main线程
    他的作用就是去调用main方法,并执行里面的代码
    在以前,我们写的所有的代码,其实都是运行在main线程当中
public class MyThread extends Thread{
    // 父类Thread的构造方法
    public MyThread() {
    }

    // 父类Thread的构造方法
    public MyThread(String name) {
        super(name);
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(getName() + ":" + i);
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args){
        //1.创建线程的对象
        /*MyThread my1 = new MyThread();
        MyThread my2 = new MyThread();

        //void setName(String name):设置线程名称
        my1.setName("高铁");
        my2.setName("飞机");*/

        //使用构造方法Thread(String name)
        MyThread my1 = new MyThread("高铁");
        MyThread my2 = new MyThread("飞机");

        my1.start();
        my2.start();

        //static Thread currentThread() 返回对当前正在执行的线程对象的引用
        System.out.println(Thread.currentThread().getName());
    }
}

image-20231228152239127

1.6 线程休眠

(1)相关方法

方法名说明
static void sleep(long millis)使当前正在执行的线程停留(暂停执行)指定的毫秒数

(2)代码演示

static void sleep(long time)        让线程休眠指定的时间,单位为毫秒
细节:
    1、哪条线程执行到这个方法,那么哪条线程就会在这里停留对应的时间
    2、方法的参数:就表示睡眠的时间,单位毫秒
    3、当时间到了之后,线程会自动的醒来,继续执行下面的其他代码
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "---" + i);
        }
    }
}
public class MyRunnableDemo {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("睡觉前");
        Thread.sleep(1000);
        System.out.println("睡醒了");

        MyRunnable mr = new MyRunnable();

        Thread t1 = new Thread(mr);
        Thread t2 = new Thread(mr);

        t1.start();
        t2.start();
    }
}

image-20231228153307245

1.7 线程优先级

(1)线程调度

  • 两种调度方式

    • 分时调度模型:所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间片
    • 抢占式调度模型:优先让优先级高的线程使用 CPU,如果线程的优先级相同,那么会随机选择一个,优先级高的线程获取的 CPU 时间片相对多一些。Java使用的是抢占式调度模型
  • 随机性

    假如计算机只有一个 CPU,那么 CPU 在某一个时刻只能执行一条指令,线程只有得到CPU时间片,也就是使用权,才可以执行指令。所以说多线程程序的执行是有随机性,因为谁抢到CPU的使用权是不一定的

(2)优先级相关方法

方法名说明
final int getPriority()返回此线程的优先级
final void setPriority(int newPriority)更改此线程的优先级线程默认优先级是5;线程优先级的范围是:1-10

(3)代码演示

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + "---" + i);
        }
        return "线程执行完毕了";
    }
}
public class MyCallableDemo {
    public static void main(String[] args) {
        //优先级: 1 - 10 默认值:5
        MyCallable mc = new MyCallable();

        FutureTask<String> ft = new FutureTask<>(mc);

        Thread t1 = new Thread(ft);
        t1.setName("飞机");
        t1.setPriority(10);
        //System.out.println(t1.getPriority());//5
        t1.start();

        MyCallable mc2 = new MyCallable();

        FutureTask<String> ft2 = new FutureTask<>(mc2);

        Thread t2 = new Thread(ft2);
        t2.setName("坦克");
        t2.setPriority(1);
        //System.out.println(t2.getPriority());//5
        t2.start();
    }
}

image-20231228154516799

1.8 守护线程

(1)相关方法

方法名说明
void setDaemon(boolean on)将此线程标记为守护线程,当运行的线程都是守护线程时,Java虚拟机将退出

(2)代码演示

final void setDaemon(boolean on)    设置为守护线程
细节:
    当其他的非守护线程执行完毕之后,守护线程会陆续结束
通俗易懂:
    当女神线程结束了,那么备胎也没有存在的必要了
public class MyThread01 extends Thread{
    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            System.out.println(getName() + ":" + i);
        }
    }
}
public class MyThread02 extends Thread{
    @Override
    public void run() {
        for (int i = 1; i <= 100; i++) {
            System.out.println(getName() + ":" + i);
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args) {
        MyThread01 t1 = new MyThread01();
        MyThread02 t2 = new MyThread02();

        t1.setName("女神");
        t2.setName("备胎");

        //把第二个线程设置为守护线程(备胎线程)
        t2.setDaemon(true);

        t1.start();
        t2.start();
    }
}

image-20231228155437393

1.9 礼让线程

(1)相关方法

public static void yield()      出让线程/礼让线程

(2)代码演示

public class MyThread extends Thread{
    @Override
    public void run() { //"飞机"  "坦克"
        for (int i = 1; i <= 5; i++) {
            System.out.println(getName() + ": " + i);
            //表示出让当前CPU的执行权
            Thread.yield();
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args) {
       /*
            public static void yield()      出让线程/礼让线程
       */
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();

        t1.setName("飞机");
        t2.setName("坦克");

        t1.start();
        t2.start();
    }
}

image-20231228160256349

1.10 插队线程

(1)相关方法

public final void join()  插入线程/插队线程

(2)代码演示

public class MyThread extends Thread{
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            System.out.println(getName() + ":" + i);
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args) throws InterruptedException {
        MyThread t = new MyThread();
        t.setName("土豆");
        t.start();

        //表示把t这个线程,插入到当前线程之前。
        //t:土豆
        //当前线程: main线程
        t.join();

        //执行在main线程当中的
        for (int i = 1; i <=5; i++) {
            System.out.println("main线程" + i);
        }
    }
}

image-20231228161535032

2、线程同步

2.1 线程安全问题演示

(1)案例需求

某电影院目前正在上映国产大片,共有100张票,而它有3个窗口卖票,请设计一个程序模拟该电影院卖票

(2)实现步骤

  • 定义一个类SellTicket实现Runnable接口,里面定义一个成员变量:private int tickets = 100;

  • 在SellTicket类中重写run()方法实现卖票,代码步骤如下

  • 判断票数大于0,就卖票,并告知是哪个窗口卖的

  • 卖了票之后,总票数要减1

  • 票卖没了,线程停止

  • 定义一个测试类SellTicketDemo,里面有main方法,代码步骤如下

  • 创建SellTicket类的对象

  • 创建三个Thread类的对象,把SellTicket对象作为构造方法的参数,并给出对应的窗口名称

  • 启动线程

(3)代码实现

public class SellTicket implements Runnable {
    private int ticket = 100;
    //在SellTicket类中重写run()方法实现卖票,代码步骤如下
    @Override
    public void run() {
        while (true) {
            if(ticket <= 0){
                //卖完了
                break;
            }else{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ticket--;
                System.out.println(Thread.currentThread().getName() + "在卖票,还剩下" + ticket + "张票");
            }
        }
    }
}
public class SellTicketDemo {
    public static void main(String[] args) {
        //创建SellTicket类的对象
        SellTicket st = new SellTicket();

        //创建三个Thread类的对象,把SellTicket对象作为构造方法的参数,并给出对应的窗口名称
        Thread t1 = new Thread(st,"窗口1");
        Thread t2 = new Thread(st,"窗口2");
        Thread t3 = new Thread(st,"窗口3");

        //启动线程
        t1.start();
        t2.start();
        t3.start();
    }
}

image-20231228162830425

2.2 卖票案例的问题

  • 卖票出现了问题

    • 相同的票出现了多次

    • 出现了负数的票

  • 问题产生原因:线程执行的随机性导致的,可能在卖票过程中丢失cpu的执行权,导致出现问题

2.3 同步代码块解决线程安全问题

(1)安全问题出现的条件

  • 是多线程环境

  • 有共享数据

  • 有多条语句操作共享数据

(2)如何解决多线程安全问题呢?

  • 基本思想:让程序没有安全问题的环境

(3)怎么实现呢?

  • 把多条语句操作共享数据的代码给锁起来,让任意时刻只能有一个线程执行即可

  • Java提供了同步代码块的方式来解决

(4)同步代码块格式:

synchronized(任意对象) { 	// 下面的代码使用本类的class标识
	多条语句操作共享数据的代码 
}

synchronized(任意对象):就相当于给代码加锁了,任意对象就可以看成是一把锁

(5)同步的好处和弊端

  • 好处:解决了多线程的数据安全问题

  • 弊端:当线程很多时,因为每个线程都会去判断同步上的锁,这是很耗费资源的,无形中会降低程序的运行效率

(6)代码演示

public class MyThread extends Thread{
    //表示这个类所有的对象,都共享ticket数据
    static int ticket = 0;//0 ~ 99

    @Override
    public void run() {
        while (true) {
            synchronized (MyThread.class) {
                //同步代码块
                if (ticket < 100) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ticket++;
                    System.out.println(getName() + "正在卖第" + ticket + "张票!");
                } else {
                    break;
                }
            }
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args) {
        //创建线程对象
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();
        MyThread t3 = new MyThread();

        //起名字
        t1.setName("窗口1");
        t2.setName("窗口2");
        t3.setName("窗口3");

        //开启线程
        t1.start();
        t2.start();
        t3.start();
    }
}

image-20231228164429877

2.4 同步方法解决线程安全问题

(1)同步方法的格式

同步方法:就是把synchronized关键字加到方法上

修饰符 synchronized 返回值类型 方法名(方法参数) { 
	方法体;
}

同步方法的锁对象是:this

(2)静态同步方法

同步静态方法:就是把synchronized关键字加到静态方法上

修饰符 static synchronized 返回值类型 方法名(方法参数) { 
	方法体;
}

同步静态方法的锁对象是:类名.class

(3)代码演示

public class MyRunnable implements Runnable {

    int ticket = 0;

    @Override
    public void run() {
        //1.循环
        while (true) {
            //2.同步方法
            if (method()) break;
        }
    }

    //this
    private synchronized boolean method() {
        //3.判断共享数据是否到了末尾,如果到了末尾
        if (ticket == 100) {
            return true;
        } else {
            //4.判断共享数据是否到了末尾,如果没有到末尾
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ticket++;
            System.out.println(Thread.currentThread().getName() + "在卖第" + ticket + "张票!!!");
        }
        return false;
    }
}
public class ThreadDemo {
    public static void main(String[] args) {
        MyRunnable mr = new MyRunnable();

        Thread t1 = new Thread(mr);
        Thread t2 = new Thread(mr);
        Thread t3 = new Thread(mr);

        t1.setName("窗口1");
        t2.setName("窗口2");
        t3.setName("窗口3");

        t1.start();
        t2.start();
        t3.start();
    }
}

image-20231228165757397

2.5 Lock锁解决线程安全问题

虽然我们可以理解同步代码块和同步方法的锁对象问题,但是我们并没有直接看到在哪里加上了锁,在哪里释放了锁,为了更清晰的表达如何加锁和释放锁,JDK5以后提供了一个新的锁对象Lock

Lock是接口不能直接实例化,这里采用它的实现类ReentrantLock来实例化

(1)ReentrantLock构造方法

方法名说明
ReentrantLock()创建一个ReentrantLock的实例

(2)加锁解锁方法

方法名说明
void lock()获得锁
void unlock()释放锁

(3)代码演示

public class MyThread extends Thread{

    static int ticket = 0;

    static Lock lock = new ReentrantLock();

    @Override
    public void run() {
        //1.循环
        while(true){
            //2.同步代码块
            //synchronized (MyThread.class){
            lock.lock(); //2 //3
            try {
                //3.判断
                if(ticket == 100){
                    break;
                    //4.判断
                }else{
                    Thread.sleep(10);
                    ticket++;
                    System.out.println(getName() + "在卖第" + ticket + "张票!!!");
                }
                //  }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}
public class ThreadDemo {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();
        MyThread t3 = new MyThread();

        t1.setName("窗口1");
        t2.setName("窗口2");
        t3.setName("窗口3");

        t1.start();
        t2.start();
        t3.start();
    }
}

image-20231228170320804

2.6 死锁

(1)死锁概述

线程死锁是指由于两个或者多个线程互相持有对方所需要的资源,导致这些线程处于等待状态,无法前往执行

(2)什么情况下会产生死锁

  • 资源有限
  • 同步嵌套

(3)代码演示

public class MyThread extends Thread {

  static Object objA = new Object();
  static Object objB = new Object();

  @Override
  public void run() {
      //1.循环
      while (true) {
          if ("线程A".equals(getName())) {
              synchronized (objA) {
                  System.out.println("线程A拿到了A锁,准备拿B锁");//A
                  synchronized (objB) {
                      System.out.println("线程A拿到了B锁,顺利执行完一轮");
                  }
              }
          } else if ("线程B".equals(getName())) {
              synchronized (objB) {
                  System.out.println("线程B拿到了B锁,准备拿A锁");//B
                  synchronized (objA) {
                      System.out.println("线程B拿到了A锁,顺利执行完一轮");
                  }
              }
          }
      }
  }
}
public class ThreadDemo {
    public static void main(String[] args) {

        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();

        t1.setName("线程A");
        t2.setName("线程B");

        t1.start();
        t2.start();

    }
}

image-20231228170942641

3、线程的等待唤醒机制

等待唤醒机制以生产者和消费者模式举例。等待唤醒机制可以用于实现线程的交替执行。

3.1 生产者和消费者模式概述

(1)概述

所谓生产者消费者问题,实际上主要是包含了两类线程:

  • 一类是生产者线程用于生产数据
  • 一类是消费者线程用于消费数据

为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为,消费者只需要从共享数据区中去获取数据,并不需要关心生产者的行为。

(2)Object类的等待和唤醒方法

方法名说明
void wait()导致当前线程等待,直到另一个线程调用该对象的 notify()方法或 notifyAll()方法
void notify()唤醒正在等待对象监视器的单个线程
void notifyAll()唤醒正在等待对象监视器的所有线程

3.2 生产者和消费者案例

(1)案例需求

  • 桌子类(Desk):定义表示包子数量的变量,定义锁对象变量,定义标记桌子上有无包子的变量

  • 生产者类(Cooker):实现Runnable接口,重写 run() 方法,设置线程任务

    1.判断是否有包子,决定当前线程是否执行

    2.如果有包子,就进入等待状态,如果没有包子,继续执行,生产包子

    3.生产包子之后,更新桌子上包子状态,唤醒消费者消费包子

  • 消费者类(Foodie):实现Runnable接口,重写run()方法,设置线程任务

    1.判断是否有包子,决定当前线程是否执行

    2.如果没有包子,就进入等待状态,如果有包子,就消费包子

    3.消费包子后,更新桌子上包子状态,唤醒生产者生产包子

  • 测试类(Demo):里面有main方法,main方法中的代码步骤如下

    创建生产者线程和消费者线程对象

    分别开启两个线程

(2)代码实现

生产者

public class Cooker extends Thread {
    /**
     * 生产者步骤:
     *   1,判断桌子上是否有汉堡包,如果有就等待,如果没有才生产。
     *   2,把汉堡包放在桌子上。
     *   3,叫醒等待的消费者开吃。
     */
    @Override
    public void run() {
        while(true){
            synchronized (Desk.lock){
                if(Desk.count == 0){
                    break;
                }else{
                    if(!Desk.flag){ //判断桌子上是否有食物
                        //如果没有食物,就生产食物
                        System.out.println("厨师正在生产汉堡包");
                        Desk.flag = true;   //修改桌子上的食物状态
                        Desk.lock.notifyAll();  //叫醒等待的消费者开吃
                    }else{
                        //如果有食物,就等待
                        try {
                            Desk.lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

消费者

public class Foodie extends Thread {
    @Override
    public void run() {
        /**
         * 1,判断桌子上是否有汉堡包。
         * 2,如果没有就等待。
         * 3,如果有就开吃
         * 4,吃完之后,桌子上的汉堡包就没有了,叫醒等待的生产者继续生产,汉堡包的总数量减一
         *
         * 套路:
         * 1. while(true)死循环
         * 2. synchronized 锁,锁对象要唯一
         * 3. 判断,共享数据是否结束. 结束
         * 4. 判断,共享数据是否结束. 没有结束
         */
        while(true){
            synchronized (Desk.lock){
                if(Desk.count == 0){
                    break;
                }else{
                    //先判断桌子上是否有食物
                    if(Desk.flag){
                        //有
                        System.out.println("吃货在吃汉堡包");
                        Desk.flag = false;  //修改桌子的状态
                        Desk.lock.notifyAll();  //没有食物之后,唤醒厨师继续做
                        Desk.count--;   // 食物数量-1
                    }else{
                        //没有就等待
                        //使用什么对象当做锁,那么就必须用这个对象去调用等待和唤醒的方法.
                        try {
                            Desk.lock.wait();   // 让当前线程跟锁进行绑定
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

桌子类

public class Desk {
    /**
     * 定义一个标记
     * true 就表示桌子上有汉堡包的,此时允许消费者执行
     * false 就表示桌子上没有汉堡包的,此时允许生产者执行
     */
    public static boolean flag = false;

    //汉堡包的总数量
    public static int count = 5;

    //锁对象
    public static final Object lock = new Object();
}

测试类

public class Demo {
    public static void main(String[] args) {
        Foodie f = new Foodie();
        Cooker c = new Cooker();
        f.start();
        c.start();
    }
}
image-20231228215914591

3.3 生产者和消费者案例优化

(1)需求

  • 将Desk类中的变量,采用面向对象的方式封装起来
  • 生产者和消费者类中构造方法接收Desk类对象,之后在run方法中进行使用
  • 创建生产者和消费者线程对象,构造方法中传入Desk类对象
  • 开启两个线程

(2)代码实现

桌子类

public class Desk {
    /**
     * 定义一个标记
     * true 表示桌子上有汉堡包的,此时允许吃货执行
     * false 表示桌子上没有汉堡包的,此时允许厨师执行
     * public static boolean flag = false;
     */
    private boolean flag;

    //汉堡包的总数量
    //public static int count = 10;
    //以后我们在使用这种必须有默认值的变量
    // private int count = 10;
    private int count;

    //锁对象
    //public static final Object lock = new Object();
    private final Object lock = new Object();

    public Desk() {
        // 在空参内部调用带参,对成员变量进行赋值,之后就可以直接使用成员变量了
        this(false,5);
    }

    public Desk(boolean flag, int count) {
        this.flag = flag;
        this.count = count;
    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public Object getLock() {
        return lock;
    }

    @Override
    public String toString() {
        return "Desk{" +
                "flag=" + flag +
                ", count=" + count +
                ", lock=" + lock +
                '}';
    }
}

生产者

public class Cooker extends Thread {

    private Desk desk;

    public Cooker(Desk desk) {
        this.desk = desk;
    }

    /**
     * 生产者步骤:
     *  1,判断桌子上是否有汉堡包,如果有就等待,如果没有才生产。
     *  2,把汉堡包放在桌子上。
     *  3,叫醒等待的消费者开吃。
     */
    @Override
    public void run() {
        while(true){
            synchronized (desk.getLock()){
                if(desk.getCount() == 0){
                    break;
                }else{
                    //System.out.println("验证一下是否执行了");
                    if(!desk.isFlag()){
                        //生产
                        System.out.println("厨师正在生产汉堡包");
                        desk.setFlag(true);
                        desk.getLock().notifyAll();
                    }else{
                        try {
                            desk.getLock().wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

消费者

public class Foodie extends Thread {
    private Desk desk;

    public Foodie(Desk desk) {
        this.desk = desk;
    }

    @Override
    public void run() {
        /**
         * 1,判断桌子上是否有汉堡包。
         * 2,如果没有就等待。
         * 3,如果有就开吃
         * 4,吃完之后,桌子上的汉堡包就没有了,叫醒等待的生产者继续生产,汉堡包的总数量减一
         *
         * 套路:
         * 1. while(true)死循环
         * 2. synchronized 锁,锁对象要唯一
         * 3. 判断,共享数据是否结束. 结束
         * 4. 判断,共享数据是否结束. 没有结束
         */
        while(true){
            synchronized (desk.getLock()){
                if(desk.getCount() == 0){
                    break;
                }else{
                    //System.out.println("验证一下是否执行了");
                    if(desk.isFlag()){
                        //有
                        System.out.println("吃货在吃汉堡包");
                        desk.setFlag(false);
                        desk.getLock().notifyAll();
                        desk.setCount(desk.getCount() - 1);
                    }else{
                        //没有就等待
                        //使用什么对象当做锁,那么就必须用这个对象去调用等待和唤醒的方法.
                        try {
                            desk.getLock().wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

测试类

public class Demo {
    public static void main(String[] args) {
        Desk desk = new Desk();
        Foodie f = new Foodie(desk);
        Cooker c = new Cooker(desk);
        f.start();
        c.start();
    }
}

image-20231228220547407

3.4 阻塞队列基本使用

(1)阻塞队列继承结构

06_阻塞队列继承结构

(2)常见BlockingQueue:

ArrayBlockingQueue: 底层是数组,有界

LinkedBlockingQueue: 底层是链表,无界.但不是真正的无界,最大为int的最大值

(3)BlockingQueue的核心方法:

put(anObject): 将参数放入队列,如果放不进去会阻塞

take(): 取出第一个数据,取不到会阻塞

(4)代码示例

public class BlockingQueueTest {
    public static void main(String[] args) throws Exception {
        // 创建阻塞队列的对象,容量为 1
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);

        // 存储元素
        arrayBlockingQueue.put("汉堡包");

        // 取元素
        System.out.println(arrayBlockingQueue.take());
        // 队列中的元素被上一行代码取出来了,所以执行到这里时,由于取不到元素,线程会被阻塞
        System.out.println(arrayBlockingQueue.take()); 
        System.out.println("程序结束了");
    }
}
image-20231228221516909

3.5 阻塞队列实现等待唤醒机制

(1)案例需求

  • 生产者类(Cooker):实现Runnable接口,重写run()方法,设置线程任务

    1.构造方法中接收一个阻塞队列对象

    2.在run方法中循环向阻塞队列中添加包子

    3.打印添加结果

  • 消费者类(Foodie):实现Runnable接口,重写run()方法,设置线程任务

    1.构造方法中接收一个阻塞队列对象

    2.在run方法中循环获取阻塞队列中的包子

    3.打印获取结果

  • 测试类(Demo):里面有main方法,main方法中的代码步骤如下

    创建阻塞队列对象

    创建生产者线程和消费者线程对象,构造方法中传入阻塞队列对象

    分别开启两个线程

(2)代码实现

需求:利用阻塞队列完成生产者和消费者(等待唤醒机制)的代码
细节:生产者和消费者必须使用同一个阻塞队列

下面的测试代码一直在执行,没有停下来的原因是:

  1. Cooker线程和Foodie线程都定义在while(true)循环里,也就是说它们会一直重复执行run()方法中的操作。
  2. Cooker线程不断向阻塞队列bd中put汉堡包。
  3. Foodie线程不断从队列中take汉堡包。
  4. 阻塞队列的容量定义为1,也就是说队列一次只能装一个汉堡包。
  5. Cooker放入一个汉堡包后,Foodie会马上take出来,导致队列空了。这时候Cooker再put,Foodie再take,就形成了一个永久循环。
  6. 因为Cooker和Foodie都在while(true)循环里执行,所以这个逻辑的确是一直在运行,没有停下来的条件。

生产者

public class Cooker extends Thread {
    private ArrayBlockingQueue<String> bd;

    public Cooker(ArrayBlockingQueue<String> bd) {
        this.bd = bd;
    }
    @Override
    public void run() {
        while (true) {
            //不断的把汉堡包放到阻塞队列当中
            try {
                bd.put("汉堡包");	// 这里的put方法底层有锁
                System.out.println("厨师放入一个汉堡包");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

public class Foodie extends Thread {
    private ArrayBlockingQueue<String> bd;
    public Foodie(ArrayBlockingQueue<String> bd) {
        this.bd = bd;
    }
    @Override
    public void run() {
        while (true) {
            //不断从阻塞队列中获取汉堡包
            try {
                String take = bd.take();	// 这里的take方法底层有锁
                System.out.println("吃货将" + take + "拿出来吃了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试类

public class Demo {
    public static void main(String[] args) {
        //1.创建阻塞队列的对象,ArrayBlockingQueue是有界队列,要指定容量,这里指定容量是1
        ArrayBlockingQueue<String> bd = new ArrayBlockingQueue<>(1);
        //2.创建线程的对象,并把阻塞队列传递过去
        Foodie f = new Foodie(bd);
        Cooker c = new Cooker(bd);
        //3.开启线程
        f.start();
        c.start();
    }
}

因为输出语句写在了锁的外面(put方法和take方法底层有锁),所以控制台的数据看起来是错乱的(出现了连续打印同一个语句的情况),但其实数据在内部是没有问题的,就是生产一个消费一个。这种情况并不会对数据的安全造成影响,因为并没有对共享数据进行修改,仅仅是打印的语句看着不规范而已。

image-20231228224020163

4、线程的状态

4.1 状态介绍

当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。线程对象在不同的时期有不同的状态。那么Java中的线程存在哪几种状态呢?Java中的线程

状态被定义在了java.lang.Thread.State枚举类中,State枚举类的源码如下:

public class Thread {
    
    public enum State {
    
        /* 新建 */
        NEW , 

        /* 可运行状态 */
        RUNNABLE , 

        /* 阻塞状态 */
        BLOCKED , 

        /* 无限等待状态 */
        WAITING , 

        /* 计时等待 */
        TIMED_WAITING , 

        /* 终止 */
        TERMINATED;
    
	}
    
    // 获取当前线程的状态
    public State getState() {
        return jdk.internal.misc.VM.toThreadState(threadStatus);
    }
    
}

通过源码我们可以看到Java中的线程存在6种状态,每种线程状态的含义如下

线程状态具体含义
NEW-新建一个尚未启动的线程的状态。也称之为初始状态、开始状态。线程刚被创建,但是并未启动。还没调用start方法。MyThread t = new MyThread()只有线程象,没有线程特征。
RUNNABLE-就绪当调用线程对象的start方法时,线程对象进入了RUNNABLE状态。那么此时才是真正的在JVM进程中创建了一个线程,线程一经启动并不是立即得到执行,线程的运行与否要听令与CPU的调度,那么我们把这个中间状态称之为可执行状态(RUNNABLE)也就是说它具备执行的资格,但是并没有真正的执行起来而是在等待CPU的度。
BLOCKED-阻塞当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态。
WAITING-等待一个正在等待的线程的状态。也称之为等待状态。造成线程等待的原因有两种,分别是调用Object.wait()、join()方法。处于等待状态的线程,正在等待其他线程去执行一个特定的操作。例如:因为wait()而等待的线程正在等待另一个线程去调用notify()或notifyAll();一个因为join()而等待的线程正在等待另一个线程结束。
TIMED_WAITING-计时等待一个在限定时间内等待的线程的状态。也称之为限时等待状态。造成线程限时等待状态的原因有三种,分别是:Thread.sleep(long),Object.wait(long)、join(long)。
TERMINATED-死亡一个完全运行完成的线程的状态。也称之为终止状态、结束状态

各个状态的转换,如下图所示:

image-20231229081738103

4.2 案例演示

为了验证上面论述的状态即状态转换的正确性,也为了加深对线程状态转换的理解,下面通过三个案例演示线程间中的状态转换。

4.2.1 案例一

本案例主要演示TIME_WAITING的状态转换

需求:编写一段代码,依次显示一个线程的这些状态:NEW -> RUNNABLE -> TIME_WAITING -> RUNNABLE -> TERMINATED

为了简化我们的开发,本次我们使用匿名内部类结合lambda表达式的方式使用多线程。

代码实现

public class ThreadStateDemo01 {
    public static void main(String[] args) throws InterruptedException {

        //定义一个内部线程
        Thread thread = new Thread(() -> {
            System.out.println("2.执行thread.start()之后,线程的状态:" + Thread.currentThread().getState());
            try {
                //休眠100毫秒
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("4.执行Thread.sleep(long)完成之后,线程的状态:" + Thread.currentThread().getState());
        });

        //获取start()之前的状态
        System.out.println("1.通过new初始化一个线程,但是还没有start()之前,线程的状态:" + thread.getState());

        //启动线程
        thread.start();

        //休眠50毫秒
        Thread.sleep(50);
        /**
         * 因为thread1需要休眠100毫秒,所以在第50毫秒,thread处于sleep状态
         * 用main线程来获取thread1线程的状态,因为thread1线程睡眠时间较长
         * 所以当main线程执行的时候,thread1线程还没有睡醒,还处于计时等待状态
         */
        System.out.println("3.执行Thread.sleep(long)时,线程的状态:" + thread.getState());

        // thread1和main线程主动休眠150毫秒,所以在第150毫秒,thread早已执行完毕
        Thread.sleep(100);

        System.out.println("5.线程执行完毕之后,线程的状态:" + thread.getState() + "\n");
    }
}

image-20231229082628705

4.2.2 案例二

本案例主要演示WAITING的状态转换

需求:编写一段代码,依次显示一个线程的这些状态:NEW -> RUNNABLE -> WAITING -> RUNNABLE -> TERMINATED

public class ThreadStateDemo02 {
    public static void main(String[] args) throws InterruptedException {

        //定义一个对象,用来加锁和解锁
        Object obj = new Object();

        //定义一个内部线程
        Thread thread1 = new Thread(() -> {
            System.out.println("2.执行thread.start()之后,线程的状态:" + Thread.currentThread().getState());
            synchronized (obj) {
                try {

                    //thread1需要休眠100毫秒
                    Thread.sleep(100);

                    //thread1100毫秒之后,通过wait()方法释放obj对象是锁
                    obj.wait();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("4.被object.notify()方法唤醒之后,线程的状态:" + Thread.currentThread().getState());
        });

        //获取start()之前的状态
        System.out.println("1.通过new初始化一个线程,但是还没有start()之前,线程的状态:" + thread1.getState());

        //启动线程
        thread1.start();

        //main线程休眠150毫秒
        Thread.sleep(150);

        //因为thread1在第100毫秒进入wait等待状态,所以第150秒肯定可以获取其状态
        System.out.println("3.执行object.wait()时,线程的状态:" + thread1.getState());

        //声明另一个线程进行解锁
        new Thread(() -> {
            synchronized (obj) {
                //唤醒等待的线程
                obj.notify();
            }
        }).start();

        //main线程休眠10毫秒等待thread1线程能够苏醒
        Thread.sleep(10);

        //获取thread1运行结束之后的状态
        System.out.println("5.线程执行完毕之后,线程的状态:" + thread1.getState() + "\n");
    }
}

image-20231229082830171

4.2.3 案例三

本案例主要演示BLOCKED的状态转换

需求:编写一段代码,依次显示一个线程的这些状态:NEW -> RUNNABLE -> BLOCKED -> RUNNABLE -> TERMINATED

public class ThreadStateDemo03 {
    public static void main(String[] args) throws InterruptedException {

        //定义一个对象,用来加锁和解锁
        Object obj2 = new Object();

        //定义一个线程,先抢占了obj2对象的锁
        new Thread(() -> {
            synchronized (obj2) {
                try {
                    Thread.sleep(100);         //第一个线程要持有锁100毫秒
                    obj2.wait();              //然后通过wait()方法进行等待状态,并释放obj2的对象锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //定义目标线程,获取等待获取obj2的锁
        Thread thread = new Thread(() -> {
            System.out.println("2.执行thread.start()之后,线程的状态:" + Thread.currentThread().getState());
            synchronized (obj2) {
                try {
                    Thread.sleep(100);      //thread3要持有对象锁100毫秒
                    obj2.notify();         //然后通过notify()方法唤醒所有在ojb2上等待的线程继续执行后续操作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("4.阻塞结束后,线程的状态:" + Thread.currentThread().getState());
        });

        //获取start()之前的状态
        System.out.println("1.通过new初始化一个线程,但是还没有thread.start()之前,线程的状态:" + thread.getState());

        //启动线程
        thread.start();

        //先等100毫秒
        Thread.sleep(50);

        //第一个线程释放锁至少需要100毫秒,所以在第50毫秒时,thread正在因等待obj的对象锁而阻塞
        System.out.println("3.因为等待锁而阻塞时,线程的状态:" + thread.getState());

        //再等300毫秒
        Thread.sleep(300);

        //两个线程的执行时间加上之前等待的50毫秒总共是250毫秒,所以第300毫秒,所有的线程都已经执行完毕
        System.out.println("5.线程执行完毕之后,线程的状态:" + thread.getState());
    }
}

image-20231229083835971

5、多线程综合练习

以下练习的代码基于JDK17

5.1 练习1-售票

需求:一共有10张电影票,可以在两个窗口领取,假设每次领取的时间为100毫秒。请用多线程模拟卖票过程并打印剩余电影票的数量。

代码示例:

package com.ya.mythreadtest.test1;

public class MyThread extends Thread {

  //第一种方式实现多线程,测试类中MyThread会创建多次,所以需要加static
  static int ticket = 10;

  @Override
  public void run() {
    //1.循环
    while (true) {
      //2.同步代码块
      synchronized (MyThread.class) {
        //3.判断共享数据(已经到末尾)
        if (ticket == 0) {
          break;
        } else {
          //4.判断共享数据(没有到末尾)
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          ticket--;
          System.out.println(getName() + "在卖票,还剩下" + ticket + "张票!!!");
        }
      }
    }
  }
}
package com.ya.mythreadtest.test1;

public class Test {
  public static void main(String[] args) {
    //创建线程对象
    MyThread t1 = new MyThread();
    MyThread t2 = new MyThread();

    //给线程设置名字
    t1.setName("窗口1");
    t2.setName("窗口2");

    //开启线程
    t1.start();
    t2.start();
  }
}

image-20231229091958907

5.2 练习2-赠送礼物

需求:有10份礼品,两人同时发送,当剩下的礼品小于2份的时候则不再送出。利用多线程模拟该过程并将线程的名字和礼物的剩余数量打印出来。

package com.ya.mythreadtest.test2;

import test1.com.ya.mythreadtest.MyThread;

public class MyRunable implements Runnable {

  //第二种方式实现多线程,测试类中MyRunable只创建一次,所以不需要加static
  int count = 10;

  @Override
  public void run() {
    //1.循环
    while (true) {
      //2.同步代码块
      synchronized (MyThread.class) {
        //3.判断共享数据(已经到末尾)
        if (count < 2) {
          System.out.println("礼物还剩下" + count + "不再赠送");
          break;
        } else {
          //4.判断共享数据(没有到末尾)
          count--;
          System.out.println(Thread.currentThread().getName() + "在赠送礼物,还剩下" + count + "个礼物!!!");
        }
      }
    }
  }
}
package com.ya.mythreadtest.test2;

public class Test {
  public static void main(String[] args) {
    //创建参数对象
    MyRunable mr = new MyRunable();

    //创建线程对象
    Thread t1 = new Thread(mr, "窗口1");
    Thread t2 = new Thread(mr, "窗口2");

    //启动线程
    t1.start();
    t2.start();
  }
}

image-20231229092211493

5.3 练习3-打印数字

需求:同时开启两个线程,共同获取1-10之间的所有数字。将输出所有的奇数。

package com.ya.mythreadtest.test3;

import test1.com.ya.mythreadtest.MyThread;

public class MyRunable implements Runnable {

  //第二种方式实现多线程,测试类中MyRunable只创建一次,所以不需要加static
  int number = 1;

  @Override
  public void run() {
    //1.循环
    while (true) {
      //2.同步代码块
      synchronized (MyThread.class) {
        //3.判断共享数据(已经到末尾)
        if (number > 10) {
          break;
        } else {
          //4.判断共享数据(没有到末尾)
          if (number % 2 == 1) {
            System.out.println(Thread.currentThread().getName() + "打印数字" + number);
          }
          number++;
        }
      }
    }
  }
}
package com.ya.mythreadtest.test3;

public class Test {
  public static void main(String[] args) {

    //创建参数对象
    MyRunable mr = new MyRunable();

    //创建线程对象
    Thread t1 = new Thread(mr, "线程A");
    Thread t2 = new Thread(mr, "线程B");

    //启动线程
    t1.start();
    t2.start();
  }
}

image-20231229092336956

5.4 练习4-抢红包

需求:抢红包也用到了多线程。假设:100块,分成了3个包,现在有5个人去抢。其中,红包是共享数据。5个人是5条线程。打印结果如下:

XXX抢到了XXX元
XXX抢到了XXX元
XXX抢到了XXX元
XXX没抢到
XXX没抢到

解决方案一:

package com.ya.mythreadtest.test4case1;

import java.util.Random;

public class MyThread extends Thread {

  //共享数据
  //100块,分成了3个包
  static double money = 100;
  static int count = 3;

  //最小的中奖金额
  static final double MIN = 0.01;

  @Override
  public void run() {
    //同步代码块
    synchronized (MyThread.class) {
      if (count == 0) {
        //判断,共享数据是否到了末尾(已经到末尾)
        System.out.println(getName() + "没有抢到红包!");
      } else {
        //判断,共享数据是否到了末尾(没有到末尾)
        //定义一个变量,表示中奖的金额
        double prize = 0;
        if (count == 1) {
          //表示此时是最后一个红包
          //就无需随机,剩余所有的钱都是中奖金额
          prize = money;
        } else {
          //表示第一次,第二次(随机)
          Random r = new Random();
          //100 元   3个包
          //第一个红包:99.98
          //100 - (3-1) * 0.01
          double bounds = money - (count - 1) * MIN;
          prize = r.nextDouble();
          if (prize < MIN) {
            prize = MIN;
          }
        }
        //从money当中,去掉当前中奖的金额
        money = money - prize;
        //红包的个数-1
        count--;
        //本次红包的信息进行打印
        System.out.println(getName() + "抢到了" + prize + "元");
      }
    }
  }
}
package com.ya.mythreadtest.test4case1;

public class Test {
  public static void main(String[] args) {
    //创建线程的对象
    MyThread t1 = new MyThread();
    MyThread t2 = new MyThread();
    MyThread t3 = new MyThread();
    MyThread t4 = new MyThread();
    MyThread t5 = new MyThread();

    //给线程设置名字
    t1.setName("小A");
    t2.setName("小QQ");
    t3.setName("小哈哈");
    t4.setName("小诗诗");
    t5.setName("小丹丹");

    //启动线程
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
  }
}

image-20231229092450877

解决方案二:

package com.ya.mythreadtest.test4case2;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Random;

public class MyThread extends Thread {

  //总金额
  static BigDecimal money = BigDecimal.valueOf(100.0);
  //个数
  static int count = 3;
  //最小抽奖金额
  static final BigDecimal MIN = BigDecimal.valueOf(0.01);

  @Override
  public void run() {
    synchronized (MyThread.class) {
      if (count == 0) {
        System.out.println(getName() + "没有抢到红包!");
      } else {
        //中奖金额
        BigDecimal prize;
        if (count == 1) {
          prize = money;
        } else {
          //获取抽奖范围
          double bounds = money.subtract(BigDecimal.valueOf(count - 1).multiply(MIN)).doubleValue();
          Random r = new Random();
          //抽奖金额
          prize = BigDecimal.valueOf(r.nextDouble());
        }
        //设置抽中红包,小数点保留两位,四舍五入
        prize = prize.setScale(2, RoundingMode.HALF_UP);
        //在总金额中去掉对应的钱
        money = money.subtract(prize);
        //红包少了一个
        count--;
        //输出红包信息
        System.out.println(getName() + "抽中了" + prize + "元");
      }
    }
  }
}
package com.ya.mythreadtest.test4case2;

public class Test {
  public static void main(String[] args) {
    MyThread t1 = new MyThread();
    MyThread t2 = new MyThread();
    MyThread t3 = new MyThread();
    MyThread t4 = new MyThread();
    MyThread t5 = new MyThread();

    t1.setName("小A");
    t2.setName("小QQ");
    t3.setName("小哈哈");
    t4.setName("小诗诗");
    t5.setName("小丹丹");

    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
  }
}

image-20231229092542487

5.5 练习5-抽奖箱

需求:有一个抽奖池,该抽奖池中存放了奖励的金额,该抽奖池中的奖项为

{10,5,20,50,100,200,500,800,2,80,300,700};

创建两个抽奖箱(线程)设置线程名称分别为“抽奖箱1”,“抽奖箱2” 。随机从抽奖池中获取奖项元素并打印在控制台上, 每次抽出一个奖项就打印一个(随机),格式如下:

抽奖箱1 又产生了一个 10 元大奖
抽奖箱1 又产生了一个 100 元大奖
抽奖箱1 又产生了一个 200 元大奖
抽奖箱1 又产生了一个 800 元大奖
抽奖箱2 又产生了一个 700 元大奖
.....
package com.ya.mythreadtest.test5;

import java.util.ArrayList;
import java.util.Collections;

public class MyThread extends Thread {

  ArrayList<Integer> list;

  public MyThread(ArrayList<Integer> list) {
    this.list = list;
  }

  @Override
  public void run() {
    //1.循环
    //2.同步代码块
    //3.判断
    //4.判断

    while (true) {
      synchronized (MyThread.class) {
        if (list.size() == 0) {
          break;
        } else {
          //继续抽奖
          Collections.shuffle(list);
          int prize = list.remove(0);
          System.out.println(getName() + "又产生了一个" + prize + "元大奖");
        }
      }
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

    }
  }
}
package com.ya.mythreadtest.test5;

import java.util.ArrayList;
import java.util.Collections;

public class Test {
  public static void main(String[] args) {
    //创建奖池
    ArrayList<Integer> list = new ArrayList<>();
    Collections.addAll(list, 10, 5, 20, 50, 100, 200, 500, 800, 2, 80, 300, 700);

    //创建线程
    MyThread t1 = new MyThread(list);
    MyThread t2 = new MyThread(list);

    //设置名字
    t1.setName("抽奖箱1");
    t2.setName("抽奖箱2");

    //启动线程
    t1.start();
    t2.start();
  }
}

image-20231229092911709

5.6 练习6-多线程统计并求最大值

在上一题基础上继续完成如下需求:

  • 每次抽的过程中,不打印,抽完时一次性打印(随机)
    • 在此次抽奖过程中,抽奖箱1总共产生了6个奖项。分别为:10,20,100,500,2,300最高奖项为300元,总计额为932元
  • 在此次抽奖过程中,抽奖箱2总共产生了6个奖项。分别为:5,50,200,800,80,700最高奖项为800元,总计额为1835元

解决方案一:

package com.ya.mythreadtest.test6case1;

import java.util.ArrayList;
import java.util.Collections;

public class MyThread extends Thread {

  ArrayList<Integer> list;

  public MyThread(ArrayList<Integer> list) {
    this.list = list;
  }

  //线程一
  static ArrayList<Integer> list1 = new ArrayList<>();
  //线程二
  static ArrayList<Integer> list2 = new ArrayList<>();

  @Override
  public void run() {
    while (true) {
      synchronized (MyThread.class) {
        if (list.size() == 0) {
          if ("抽奖箱1".equals(getName())) {
            System.out.println("抽奖箱1总共产生了" + list1.size() + "个奖项,分别为:" + list1 +
                    ",最高奖项为" + Collections.max(list1) +
                    ",总计额为" + list1.stream().reduce(0, Integer::sum));
          } else {
            System.out.println("抽奖箱2总共产生了" + list2.size() + "个奖项,分别为:" + list2 +
                    ",最高奖项为" + Collections.max(list2) +
                    ",总计额为" + list2.stream().reduce(0, Integer::sum));
          }
          break;
        } else {
          //继续抽奖
          Collections.shuffle(list);
          int prize = list.remove(0);
          if ("抽奖箱1".equals(getName())) {
            list1.add(prize);
          } else {
            list2.add(prize);
          }
        }
      }
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
package com.ya.mythreadtest.test6case1;

import java.util.ArrayList;
import java.util.Collections;

public class Test {
  public static void main(String[] args) {
    //创建奖池
    ArrayList<Integer> list = new ArrayList<>();
    Collections.addAll(list, 10, 5, 20, 50, 100, 200, 500, 800, 2, 80, 300, 700);

    //创建线程
    MyThread t1 = new MyThread(list);
    MyThread t2 = new MyThread(list);

    //设置名字
    t1.setName("抽奖箱1");
    t2.setName("抽奖箱2");

    //启动线程
    t1.start();
    t2.start();
  }
}

image-20231229100740552

解决方案二:

package com.ya.mythreadtest.test6case2;

import java.util.ArrayList;
import java.util.Collections;

public class MyThread extends Thread {

  ArrayList<Integer> list;

  public MyThread(ArrayList<Integer> list) {
    this.list = list;
  }

  @Override
  public void run() {
    ArrayList<Integer> boxList = new ArrayList<>();//1 //2
    while (true) {
      synchronized (MyThread.class) {
        if (list.size() == 0) {
          System.out.println("抽奖箱" + getName() + "总共产生了" + boxList.size() + "个奖项,分别为:" + boxList +
                  ",最高奖项为" + Collections.max(boxList) +
                  ",总计额为" + boxList.stream().reduce(0, Integer::sum));
          break;
        } else {
          //继续抽奖
          Collections.shuffle(list);
          int prize = list.remove(0);
          boxList.add(prize);
        }
      }
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
package com.ya.mythreadtest.test6case2;

import java.util.ArrayList;
import java.util.Collections;

public class Test {
  public static void main(String[] args) {
    //创建奖池
    ArrayList<Integer> list = new ArrayList<>();
    Collections.addAll(list, 10, 5, 20, 50, 100, 200, 500, 800, 2, 80, 300, 700);
    //创建线程
    MyThread t1 = new MyThread(list);
    MyThread t2 = new MyThread(list);

    //设置名字
    t1.setName("抽奖箱1");
    t2.setName("抽奖箱2");

    //启动线程
    t1.start();
    t2.start();
  }
}

image-20231229100843949

5.7 练习7-多线程之间的比较

在上一题基础上继续完成如下需求:

在此次抽奖过程中,抽奖箱1总共产生了6个奖项,分别为:10,20,100,500,2,300

最高奖项为300元,总计额为932元

在此次抽奖过程中,抽奖箱2总共产生了6个奖项,分别为:5,50,200,800,80,700

最高奖项为800元,总计额为1835元

在此次抽奖过程中,抽奖箱2中产生了最大奖项,该奖项金额为800元

以上打印效果只是数据模拟,实际代码运行的效果会有差异。

package com.ya.mythreadtest.test7;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Callable;

public class MyCallable implements Callable<Integer> {

  ArrayList<Integer> list;

  public MyCallable(ArrayList<Integer> list) {
    this.list = list;
  }

  @Override
  public Integer call() throws Exception {
    ArrayList<Integer> boxList = new ArrayList<>();//1 //2
    while (true) {
      synchronized (MyCallable.class) {
        if (list.size() == 0) {
          System.out.println("在此次抽奖过程中," + Thread.currentThread().getName() + "总共产生了6个奖项,分别为:" + boxList);
          break;
        } else {
          //继续抽奖
          Collections.shuffle(list);
          int prize = list.remove(0);
          boxList.add(prize);
        }
      }
      Thread.sleep(10);
    }
    //把集合中的最大值返回
    if (boxList.size() == 0) {
      return null;
    } else {
      return Collections.max(boxList);
    }
  }
}
package com.ya.mythreadtest.test7;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Test {
  public static void main(String[] args) throws ExecutionException, InterruptedException {

    //创建奖池
    ArrayList<Integer> list = new ArrayList<>();
    Collections.addAll(list, 10, 5, 20, 50, 100, 200, 500, 800, 2, 80, 300, 700);

    //创建多线程要运行的参数对象
    MyCallable mc = new MyCallable(list);

    //创建多线程运行结果的管理者对象
    //线程一
    FutureTask<Integer> ft1 = new FutureTask<>(mc);
    //线程二
    FutureTask<Integer> ft2 = new FutureTask<>(mc);

    //创建线程对象
    Thread t1 = new Thread(ft1);
    Thread t2 = new Thread(ft2);

    //设置名字
    t1.setName("抽奖箱1");
    t2.setName("抽奖箱2");

    //开启线程
    t1.start();
    t2.start();

    Integer max1 = ft1.get();
    Integer max2 = ft2.get();

    System.out.println("抽奖箱1的最高奖项:" + max1);
    System.out.println("抽奖箱2的最高奖项:" + max2);

    //在此次抽奖过程中,抽奖箱2中产生了最大奖项,该奖项金额为800元
    if (max1 == null) {
      System.out.println("在此次抽奖过程中,抽奖箱2中产生了最大奖项,该奖项金额为" + max2 + "元");
    } else if (max2 == null) {
      System.out.println("在此次抽奖过程中,抽奖箱1中产生了最大奖项,该奖项金额为" + max1 + "元");
    } else if (max1 > max2) {
      System.out.println("在此次抽奖过程中,抽奖箱1中产生了最大奖项,该奖项金额为" + max1 + "元");
    } else if (max1 < max2) {
      System.out.println("在此次抽奖过程中,抽奖箱2中产生了最大奖项,该奖项金额为" + max2 + "元");
    } else {
      System.out.println("两者的最大奖项是一样的");
    }
  }
}

image-20231229101032006

6、线程池

6.1 线程池基本原理

线程池可以看做成一个池子,在该池子中存储很多个线程。

线程池存在的意义:系统创建一个线程的成本是比较高的,因为它涉及到与操作系统交互,当程序中需要创建大量生存期很短暂的线程时,频繁的创建和销毁线程对系统的资源消耗有可能大于业务处理逻辑。

针对这一种情况,为了提高性能,就可以采用线程池。线程池在启动的时,会创建大量空闲线程,当我们向线程池提交任务的时,线程池就会启动一个线程来执行该任务。

等待任务执行完毕以后,线程并不会死亡,而是再次返回到线程池中称为空闲状态,等待下一次任务的执行。

线程池的设计思路 :

  • 准备一个任务容器;
  • 一次性启动多个(2个)消费者线程;
  • 刚开始任务容器是空的,所以线程都在wait;
  • 直到一个外部线程向这个任务容器中扔了一个"任务",就会有一个消费者线程被唤醒;
  • 这个消费者线程取出"任务",并且执行这个任务,执行完毕后,继续等待下一次任务的到来。

可以使用Executors中所提供的静态方法来创建线程池

static ExecutorService newCachedThreadPool()   创建一个默认的线程池
static newFixedThreadPool(int nThreads)	    创建一个指定最多线程数量的线程池

6.2 Executors创建默认线程池

JDK对线程池也进行了相关的实现,在真实企业开发中我们也很少去自定义线程池,而是使用JDK中自带的线程池。

static ExecutorService newCachedThreadPool()   创建一个默认的线程池,默认线程数量int的范围

代码实现:

线程池中线程要执行的任务

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            System.out.println(Thread.currentThread().getName() + "---" + i);
        }
    }
}
public class MyThreadPoolDemo01 {
    public static void main(String[] args) throws InterruptedException {
        //1.获取线程池对象
        ExecutorService pool1 = Executors.newCachedThreadPool();
        //2.提交任务
        pool1.submit(new MyRunnable());
        pool1.submit(new MyRunnable());
        pool1.submit(new MyRunnable());

        //3.销毁线程池,线程池一般不销毁
        //pool1.shutdown();
    }
}

image-20231229104659926

6.3 Executors创建指定上限的线程池

使用Executors中所提供的静态方法来创建线程池

static ExecutorService newFixedThreadPool(int nThreads) : 创建一个指定最多线程数量的线程池

代码实现 :

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            System.out.println(Thread.currentThread().getName() + "---" + i);
        }
    }
}
public class MyThreadPoolDemo02 {
    public static void main(String[] args) throws InterruptedException {
        //1.获取线程池对象
        ExecutorService pool1 = Executors.newFixedThreadPool(2);
        //2.提交任务
        pool1.submit(new MyRunnable());
        pool1.submit(new MyRunnable());
        pool1.submit(new MyRunnable());

        //3.销毁线程池,线程池一般不销毁
        //pool1.shutdown();
    }
}

image-20231229104926790

6.4 自定义线程池

创建线程池对象 :

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最大存活时间,任务队列,创建线程工厂,任务的拒绝策略);

代码实现 :

package com.ya.mythreadpool.definedthreadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 自定义线程池
 * @author yagote   create:2023/12/29 11:03
 */
public class DefinedThreadPool {
    public static void main(String[] args){

    /*
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
        (核心线程数量,最大线程数量,空闲线程最大存活时间,任务队列,创建线程工厂,任务的拒绝策略);

        参数一:核心线程数量              不能小于0
        参数二:最大线程数                不能小于0,最大数量 >= 核心线程数量
        参数三:空闲线程最大存活时间       不能小于0
        参数四:时间单位                  用TimeUnit指定
        参数五:任务队列                  不能为null
        参数六:创建线程工厂              不能为null
        参数七:任务的拒绝策略             不能为null
    */

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3,      //核心线程数量,能小于0
                6,      //最大线程数,不能小于0,最大数量 >= 核心线程数量
                60,     //空闲线程最大存活时间
                TimeUnit.SECONDS,//时间单位
                new ArrayBlockingQueue<>(3),        //任务队列
                Executors.defaultThreadFactory(),           //创建线程工厂
                new ThreadPoolExecutor.AbortPolicy()        //任务的拒绝策略
        );

        // 之后的代码和默认的线程池一样,向线程池池中提交任务即可
    }
}

6.5 线程池参数详解

1591165506516

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
corePoolSize:   核心线程的最大值,不能小于0(临时线程=maximumPoolSize-corePoolSize)
maximumPoolSize:最大线程数,不能小于等于0,maximumPoolSize >= corePoolSize
keepAliveTime:  空闲线程最大存活时间,不能小于0
unit:           时间单位
workQueue:      任务队列,不能为null
threadFactory:  创建线程工厂,不能为null      
handler:        任务的拒绝策略,不能为null  

临时线程=maximumPoolSize-corePoolSize

(1)提交3个任务,创建3条线程去处理,核心线程刚好能够完全处理

image-20231229113129267

(2)提交5个任务

由于核心线程为3,所以有两个线程需要进入队列中等待,队列还没满(这里队列容量为3),临时线程还不能起作用。

image-20231229113217905

(3)提交8个任务

核心线程都在忙,而且队列中已经排满了,才会创建临时线程,所以先提交的任务不一定先执行,这里的任务7和任务8比任务4、5、6先执行

image-20231229113517483

(4)提交10个任务

核心线程、队列长度、临时线程都满了之后就会触发拒绝策略

image-20231229113646584

6.6 线程池任务拒绝策略

image-20231229111014407

image-20231229111216919

RejectedExecutionHandler是jdk提供的一个任务拒绝策略接口,它下面存在4个子类。

ThreadPoolExecutor.AbortPolicy: 		   丢弃任务并抛出RejectedExecutionException异常,是默认的策略。
ThreadPoolExecutor.DiscardPolicy: 		  丢弃任务,但是不抛出异常 这是不推荐的做法。
ThreadPoolExecutor.DiscardOldestPolicy:    抛弃队列中等待最久的任务 然后把当前任务加入队列中。
ThreadPoolExecutor.CallerRunsPolicy:        调用任务的run()方法绕过线程池直接执行。

注:明确线程池对多可执行的任务数 = 队列容量 + 最大线程数

6.6.1 AbortPolicy

案例演示1:演示ThreadPoolExecutor.AbortPolicy任务处理策略

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,是默认的策略。
public class ThreadPoolRejected01 {
    public static void main(String[] args) {

        // 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
                new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.AbortPolicy()) ;

        // 提交5个任务,而该线程池最多可以处理4个任务,当我们使用AbortPolicy这个任务处理策略的时候,就会抛出异常
        for(int x = 1 ; x <= 5 ; x++) {
            int finalX = x;
            threadPoolExecutor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + "---->> 执行了任务:"+ finalX);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }
}
  • 第一个任务被核心线程执行。
  • 第二个任务被放入队列等待,第一个任务完成后,核心线程去执行第二个任务
  • 第三、第四个任务创建两个新线程分别执行(因为此时核心线程数1和队列容器均已满,但线程池数量未达最大值,所以创建新的线程来执行第三个任务)。
  • 第五个任务由于线程池已经达到最大线程数,且队列也满了,因此触发 AbortPolicy,抛出 RejectedExecutionException 异常。

image-20231229124402735

控制台报错,仅仅执行了4个任务,最后一个任务被丢弃了

6.6.2 DiscardPolicy

案例演示2:演示ThreadPoolExecutor.DiscardPolicy任务处理策略

ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常 这是不推荐的做法。
public class ThreadPoolRejected02 {
    public static void main(String[] args) {
        // 核心线程数量为1 , 最大线程池数量为3, 队列的容量为1 ,空闲线程的最大存在时间为20s
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
                new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy()) ;

        // 提交5个任务,而该线程池最多可以处理4个任务,当我们使用DiscardPolicy这个任务处理策略的时候,控制台不会报错
        for(int x = 0 ; x < 5 ; x++) {
            threadPoolExecutor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
            });
        }
    }
}

image-20231229135231687

控制台没有报错,仅仅执行了4个任务,有一个任务被丢弃了

6.6.3 DiscardOldestPolicy

案例演示3:演示ThreadPoolExecutor.DiscardOldestPolicy任务处理策略

ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列中等待最久的任务 然后把当前任务加入队列中。
public class ThreadPoolRejected03 {
    public static void main(String[] args) {
        // 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
        ThreadPoolExecutor threadPoolExecutor;
        threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
                new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy());
        // 提交5个任务
        for(int x = 0 ; x < 5 ; x++) {
            // 定义一个变量,来指定指定当前执行的任务;这个变量需要被final修饰
            final int y = x ;
            threadPoolExecutor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + "---->> 执行了任务" + y);
            });
        }
    }
}

image-20231229135635669

由于任务1在线程池中等待时间最长,因此任务1被丢弃。

6.6.4 CallerRunsPolicy

案例演示4:演示ThreadPoolExecutor.CallerRunsPolicy任务处理策略

ThreadPoolExecutor.CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行。
public class ThreadPoolRejected04 {
    public static void main(String[] args) {
        // 核心线程数量为1 , 最大线程池数量为3, 任务容器的容量为1 ,空闲线程的最大存在时间为20s
        ThreadPoolExecutor threadPoolExecutor;
        threadPoolExecutor = new ThreadPoolExecutor(1 , 3 , 20 , TimeUnit.SECONDS ,
                new ArrayBlockingQueue<>(1) , Executors.defaultThreadFactory() , new ThreadPoolExecutor.CallerRunsPolicy());

        // 提交5个任务
        for(int x = 0 ; x < 5 ; x++) {
            threadPoolExecutor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + "---->> 执行了任务");
            });
        }
    }
}

image-20231229135816277

通过控制台的输出,我们可以看到次策略没有通过线程池中的线程执行任务,而是直接调用任务的run()方法绕过线程池直接执行。


下面内容选择性查看。


7、volatile关键字

7.1 看程序说结果

分析如下程序,说出在控制台的输出结果。

Thread的子类

public class VolatileThread extends Thread {

    // 定义成员变量
    private boolean flag = false ;
    public boolean isFlag() { return flag;}

    @Override
    public void run() {

        // 线程休眠1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 将flag的值更改为true
        this.flag = true ;
        System.out.println("flag=" + flag);

    }
}

测试类

public class VolatileThreadDemo01 {
    
    public static void main(String[] args) {

        // 创建VolatileThread线程对象
        VolatileThread volatileThread = new VolatileThread() ;
        volatileThread.start();

        // 在main线程中获取开启的线程中flag的值
        while(true) {
            System.out.println("main线程中获取开启的线程中flag的值为" + volatileThread.isFlag());
        }
        
    }
}

控制台输出结果,前面是false,过了一段时间之后就变成了true。

image-20231229144851843

在volatileThread线程的run方法中,线程休眠1s,休眠一秒以后那么flag的值应该为true,此时我们在主线程中不停的获取flag的值,发现前面释放false,后面是true信息。那么这是为什么呢?要想知道原因,就需要学习一下JMM。

7.2 JMM

**JMM(Java Memory Model)**Java内存模型,是java虚拟机规范中所定义的一种内存模型。

Java内存模型描述了Java程序中各种变量(线程共享变量)的访问规则,以及在JVM中将变量存储到内存和从内存中读取变量这样的底层细节。

特点:

  • 所有的共享变量都存储于主内存(计算机的RAM)这里所说的变量指的是实例变量和类变量(静态变量)。不包含局部变量,因为局部变量是线程私有的,因此不存在竞争问题。
  • 每一个线程还存在自己的工作内存,线程的工作内存,保留了被线程使用的变量的工作副本。
  • 线程对变量的所有的操作(读,写)都必须在工作内存中完成,而不能直接读写主内存中的变量,不同线程之间也不能直接访问对方工作内存中的变量,线程间变量的值的传递需要通过主内存完成。

1571743818653

7.3 问题分析

了解了一下JMM,那么接下来我们就来分析一下上述程序产生问题的原因。

1571744627663

产生问题的流程分析:

  1. VolatileThread线程从主内存读取到数据放入其对应的工作内存

  2. 将flag的值更改为true,但是这个时候flag的值还没有回写主内存

  3. 此时main线程读取到了flag的值并将其放入到自己的工作内存中,此时flag的值为false

  4. VolatileThread线程将flag的值写回到主内存,但是main函数里面的while(true)调用的是系统比较底层的代码,速度快,快到没有时间再去读取主内存中的值,所以while(true)

    读取到的值一直是false。(如果有一个时刻main线程从主内存中读取到了flag的最新值,那么if语句就可以执行,main线程何时从主内存中读取最新的值,我们无法控制)

我们可以让主线程执行慢一点,执行慢一点以后,在某一个时刻,可能就会读取到主内存中最新的flag的值,那么if语句就可以进行执行。

测试类

public class VolatileThreadDemo02 {

    public static void main(String[] args) throws InterruptedException {

        // 创建VolatileThread线程对象
        VolatileThread volatileThread = new VolatileThread() ;
        volatileThread.start();

        // main方法
        while(true) {
            if(volatileThread.isFlag()) {
                System.out.println("执行了======");
            }
            // 让线程休眠100毫秒
            TimeUnit.MILLISECONDS.sleep(100);
        }
    }
}

image-20231229145100374

此时我们可以看到if语句已经执行了。当然我们在真实开发中可能不能使用这种方式来处理这个问题,那么这个问题应该怎么处理呢?我们就需要学习下一小节的内容。

7.4 问题处理

7.4.1 加锁

第一种处理方案,我们可以通过加锁的方式进行处理。

测试类

public class VolatileThreadDemo03 {

    public static void main(String[] args) throws InterruptedException {

        // 创建VolatileThread线程对象
        VolatileThread volatileThread = new VolatileThread() ;
        volatileThread.start();

        // main方法
        while(true) {

            // 加锁进行问题处理
            synchronized (volatileThread) {
                if(volatileThread.isFlag()) {
                    System.out.println("执行了======");
                }
            }
        }
    }
}

控制台输出结果

flag=true
执行了======
执行了======
执行了======
....

对上述代码加锁完毕以后,某一个线程支持该程序的过程如下:

a.线程获得锁
b.清空工作内存
c.从主内存拷贝共享变量最新的值到工作内存成为副本
d.执行代码
e.将修改后的副本的值刷新回主内存中
f.线程释放锁

7.4.2 volatile关键字

第二种处理方案,可以通过volatile关键字来修饰flag变量。

public class VolatileThread extends Thread {

    // 定义成员变量
    private volatile boolean flag = false ;
    public boolean isFlag() { return flag;}

    @Override
    public void run() {

        // 线程休眠1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 将flag的值更改为true
        this.flag = true ;
        System.out.println("flag=" + flag);

    }
}
//--------------------------------更新之后的案例-------------------------------------------
public class VolatileTest extends Thread{
    boolean flag = false;
    int i = 0;

    public void run() {
        while (!flag) {
            i++;
        }
        System.out.println("stope" + i);
    }

    public static void main(String[] args) throws Exception {
        VolatileTest vt = new VolatileTest();
        vt.start();

        Thread.sleep(10);
        vt.flag = true;

    }
}

控制台输出结果

flag=true
执行了======
执行了======
执行了======
....

工作原理说明

1571746088704

执行流程分析

  1. VolatileThread线程从主内存读取到数据放入其对应的工作内存
  2. 将flag的值更改为true,但是这个时候flag的值还没有回写主内存
  3. 此时main线程读取到了flag的值并将其放入到自己的工作内存中,此时flag的值为false
  4. VolatileThread线程将flag的值写到主内存
  5. main线程工作内存中的flag变量副本失效
  6. main线程再次使用flag时,main线程会从主内存读取最新的值,放入到工作内存中,然后在进行使用

7.5 volatile原子性测试

volatile在多线程环境下只保证了共享变量在多个线程间的可见性,但是不保证原子性。下面做个测试进行验证。

线程类

public class VolatileAtomicThread implements Runnable {

    // 定义一个int类型的变量,并且使用volatile修饰
    private volatile int count = 0 ;

    @Override
    public void run() {
        
        // 对该变量进行++操作,100次
        for(int x = 0 ; x < 100 ; x++) {
            count++ ;					
            System.out.println("count =========>>>> " + count);
        }
    }
}

测试类

public class VolatileAtomicThreadDemo {

    public static void main(String[] args) {

        // 创建VolatileAtomicThread对象
        VolatileAtomicThread volatileAtomicThread = new VolatileAtomicThread() ;

        // 开启100个线程对count进行++操作
        for(int x = 0 ; x < 100 ; x++) {
            new Thread(volatileAtomicThread).start();
        }
    }
}

控制台输出结果(需要运行多次)

image-20231229152632297

通过控制台结果的输出,我们可以看到程序还是会出现问题。因此也就证明volatile关键字是不保证原子性的。

7.6 volatile使用场景

7.6.1 状态标志

比如现在存在一个线程不断向控制台输出一段话"Hello,World,Hello,World,Hello,World",当这个线程执行5秒以后,将该线程结束。

实现思路:定义一个boolean类型的变量,这个变量就相当于一个标志。当这个变量的值为true的时候,线程一直执行,10秒以后我们把这个变量的值更改为false,此时结束该线程的执行。

为了保证一个线程对这个变量的修改,另外一个线程立马可以看到,这个变量就需要通过volatile关键字进行修饰。

线程类

public class VolatileUseThread implements Runnable {

    // 定义标志变量
    private volatile boolean shutdown = false ;

    @Override
    public void run() {

        while(!shutdown) {
            System.out.println("Hello,World,Hello,World,Hello,World");
        }

    }

    // 关闭线程
    public void shutdown() {
        this.shutdown = true ;
    }

}

测试类

public class VolatileUseThreadDemo01 {

    public static void main(String[] args) throws InterruptedException {

        // 创建线程任务类对象
        VolatileUseThread volatileUseThread = new VolatileUseThread() ;

        // 创建线程对象
        Thread thread = new Thread(volatileUseThread);

        // 启动线程
        thread.start();

        // 主线程休眠
        TimeUnit.SECONDS.sleep(5);

        // 关闭线程
        volatileUseThread.shutdown();
    }
}

观察控制台输出,volatileUseThread线程执行5秒以后程序结束。

7.6.2 独立观察
AI养猪。。。。
设备区测量温度
当温度高了。。。需要给猪开空调。。。加冰棍。。。加喝的水。。。

volatile的另一种简单使用场景是:定期"发布"观察结果供程序内部使用。

例如,假设有一种环境传感器能够感觉环境温度。一个后台线程可能会每隔几秒读取一次该传感器数据,并更新包含这个volatile变量的值。然后,其他线程可以读取这个变量,从而随时能够看到最新的温度值。这种使用就是多个线程操作共享变量,但是是有一个线程对其进行写操作,其他的线程都是读。

我们可以设计一个程序,模拟上面的温度传感器案例。

实现步说明

  1. 定义一个温度传感器(TemperatureSensor)的类,在该类中定义两个成员变量temperature(温度值),type(传感器的类型),temperature变量需要被volatile修饰

  2. 定义一个读取温度传感器的线程的任务类(ReadTemperatureRunnable),该类需要定义一个TemperatureSensor类型的成员变量(该线程需要读取温度传感器的数据)

  3. 定义一个定时采集温度的线程任务类(GatherTemperatureRunnable),该类需要定义一个TemperatureSensor类型的成员变量(该线程需要将读到的温度设置给传感器)

  4. 创建测试类(TemperatureSensorDemo)

    1. 创建TemperatureSensor对象
    2. 创建ReadTemperatureRunnable类对象,把TemperatureSensor作为构造方法的参数传递过来
    3. 创建GatherTemperatureRunnable类对象,把TemperatureSensor作为构造方法的参数传递过来
    4. 创建2个Thread对象,并启动,把第二步所创建的对象作为构造方法参数传递过来,这两个线程负责读取TemperatureSensor中的温度数据
    5. 创建1个Thread对象,并启动,把第三步所创建的对象作为构造方法参数传递过来,这个线程负责读取定时采集数据中的温度数据

TemperatureSensor类

public class TemperatureSensor {        // 温度传感器类

    private volatile int temperature ;  // 温度值

    private String type ;               // 传感器的类型

    public int getTemperature() {
        return temperature;
    }

    public void setTemperature(int temperature) {
        this.temperature = temperature;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }
}

ReadTemperatureRunnable类

public class ReadTemperatureRunnable implements Runnable {

    // 温度传感器
    private TemperatureSensor temperatureSensor ;
    public ReadTemperatureRunnable(TemperatureSensor temperatureSensor) {
        this.temperatureSensor = temperatureSensor ;
    }

    @Override
    public void run() {

        // 不断的读取温度传感器中的数据
        while(true) {

            // 读取数据
            System.out.println(Thread.currentThread().getName() + "---读取到的温度数据为------>>> " + temperatureSensor.getTemperature());

            try {
                // 让线程休眠100毫秒,便于观察
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

GatherTemperatureRunnable类

public class GatherTemperatureRunnable implements Runnable {

    // 温度传感器
    private TemperatureSensor temperatureSensor ;
    public GatherTemperatureRunnable(TemperatureSensor temperatureSensor) {
        this.temperatureSensor = temperatureSensor ;
    }

    @Override
    public void run() {

        // 定义一个变量,表示环境初始温度
        int temperature = 23 ;

        // 不断进行数据采集
        while(true) {

            // 将采集到的数据设置给温度传感器
            System.out.println(Thread.currentThread().getName() + "-----采集到的数据为----->>> " + temperature);
            temperatureSensor.setTemperature(temperature);

            try {
                // 线程休眠2秒,模拟每隔两秒采集一次数据
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 环境温度改变
            temperature += 2 ;

        }
    }
}

测试类

public class TemperatureSensorDemo {

    public static void main(String[] args) {

        // 创建TemperatureSensor对象
        TemperatureSensor temperatureSensor = new TemperatureSensor();

        // 创建ReadTemperatureRunnable类对象
        ReadTemperatureRunnable readTemperatureRunnable = new ReadTemperatureRunnable(temperatureSensor) ;

        // 创建GatherTemperatureRunnable类对象
        GatherTemperatureRunnable gatherTemperatureRunnable = new GatherTemperatureRunnable(temperatureSensor) ;

        // 创建2个Thread对象,并启动; 这两个线程负责读取TemperatureSensor中的温度数据
        for(int x = 0 ; x < 2 ; x++) {
            new Thread(readTemperatureRunnable).start();
        }

        // 创建1个Thread对象,并启动,这个线程负责读取定时采集数据中的温度数据
        Thread gatherThread = new Thread(gatherTemperatureRunnable);
        gatherThread.setName("温度采集线程");
        gatherThread.start();
    }
}

控制台输出结果

...
温度采集线程-----采集到的数据为----->>> 23
Thread-0---读取到的温度数据为------>>> 23
...
温度采集线程-----采集到的数据为----->>> 25
Thread-1---读取到的温度数据为------>>> 25
...

通过控制台的输出,我们可以看到当温度采集线程刚采集到环境温度以后,那么此时两个温度读取线程就可以立即感知到环境温度的变化。

总结: volatile保证不同线程对共享变量操作的可见性,也就是说一个线程修改了volatile修饰的变量,当修改写回主内存时,另外一个线程立即看到最新的值。但是volatile不保证原子性(关于原子性问题,我们在下面的小节中会介绍)。

volatile与synchronized的区别:

  • volatile只能修饰实例变量和类变量,而synchronized可以修饰方法,以及代码块。
  • volatile保证数据的可见性,但是不保证原子性(多线程进行写操作,不保证线程安全);
  • 而synchronized是一种排他(互斥)的机制(因此有时我们也将synchronized这种锁称之为排他(互斥)锁),synchronized修饰的代码块,被修饰的代码块称之为同步代码块,无法被中断可以保证原子性,也可以间接的保证可见性。

8、原子性

原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体。

8.1 看程序说结果

分析如下程序的执行结果

线程类

public class VolatileAtomicThread implements Runnable {

    // 定义一个int类型的变量
    private int count = 0 ;

    @Override
    public void run() {
        
        // 对该变量进行++操作,100次
        for(int x = 0 ; x < 100 ; x++) {
            count++ ;					
            System.out.println("冰淇淋的个数 =========>>>> " + count);
        } 
    }
}

测试类

public class VolatileAtomicThreadDemo {

    public static void main(String[] args) {

        // 创建VolatileAtomicThread对象
        VolatileAtomicThread volatileAtomicThread = new VolatileAtomicThread() ;

        // 开启100个线程对count进行++操作
        for(int x = 0 ; x < 100 ; x++) {
            new Thread(volatileAtomicThread).start();
        }  
    }
}

程序分析:我们在主线程中通过for循环启动了100个线程,每一个线程都会对VolatileAtomicThread类中的count加100次。那么直接结果应该是10000。但是真正的执行结果和我们分析的是否一样呢?运行程序(多运行几次),查看控制台输出结果

image-20231229153028848

通过控制台的输出,可以看到最终count的结果可能并不是10000。接下来我们就来分析一下问题产生的原因。

8.2 问题分析说明

以上问题主要是发生在count++操作上:

count++操作包含3个步骤:

  • 从主内存中读取数据到工作内存
  • 对工作内存中的数据进行++操作
  • 将工作内存中的数据写回到主内存

count++操作不是一个原子性操作,也就是说在某一个时刻对某一个操作的执行,有可能被其他的线程打断。

1571794778139

产生问题的执行流程分析:

  1. 假设此时count的值是100,线程A需要对改变量进行自增1的操作,首先它需要从主内存中读取变量count的值。由于CPU的切换关系,此时CPU的执行权被切换到了B线程。A线程就处于就绪状态,B线程处于运行状态。

  2. 线程B也需要从主内存中读取count变量的值,由于线程A没有对count值做任何修改因此此时B读取到的数据还是100

  3. 线程B工作内存中对count执行了+1操作,但是未刷新之主内存中

  4. 此时CPU的执行权切换到了A线程上,由于此时线程B没有将工作内存中的数据刷新到主内存,因此A线程工作内存中的变量值还是100,没有失效。A线程对工作内存中的数据进行了+1操作。

  5. 线程B将101写入到主内存

  6. 线程A将101写入到主内存

虽然计算了2次,但是只对A进行了1次修改。

8.5 问题处理

接下来我们就来讲解一下我们上述案例(引入原子性问题的案例)的解决方案。

8.5.1 锁机制

我们可以给count++操作添加锁,那么count++操作就是临界区中的代码,临界区中的代码一次只能被一个线程去执行,所以count++就变成了原子操作。

线程任务类

public class VolatileAtomicThread implements Runnable {

    // 定义一个int类型的变量,
    private int count = 0 ;

    // 定义一个Object类型的变量,该变量将作为同步代码块的锁
    private Object obj = new Object();

    @Override
    public void run() {
        
        // 对该变量进行++操作,100次
        for(int x = 0 ; x < 100 ; x++) {
            synchronized (obj){
                count++ ;
                System.out.println("count =========>>>> " + count);
            }
        } 
    }
}

控制台输出结果

image-20231229153350757

8.5.2 原子类
8.5.2.1 AtomicInteger

java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。本次只讲解使用原子的方式更新基本类型,使用原子的方式更新基本类型Atomic包提供了以下3个类:

  • AtomicBoolean: 原子更新布尔类型
  • AtomicInteger: 原子更新整型
  • AtomicLong: 原子更新长整型

以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下:

public AtomicInteger():	   				初始化一个默认值为0的原子型Integer
public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer

int get():   			 				 获取值
int getAndIncrement():      			 以原子方式将当前值加1,注意,这里返回的是自增前的值。
int incrementAndGet():     				 以原子方式将当前值加1,注意,这里返回的是自增后的值。
int addAndGet(int data):				 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
int getAndSet(int value):   			 以原子方式设置为newValue的值,并返回旧值。

案例演示AtomicInteger的基本使用:

public class AtomicIntegerDemo01 {

    // 原子型Integer
    public static void main(String[] args) {

        // 构造方法
        // public AtomicInteger():初始化一个默认值为0的原子型Integer
        // AtomicInteger atomicInteger = new AtomicInteger() ;
        // System.out.println(atomicInteger);

        // public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer
        AtomicInteger atomicInteger = new AtomicInteger(5) ;
        System.out.println(atomicInteger);

        // 获取值
        System.out.println(atomicInteger.get());

        // 以原子方式将当前值加1,这里返回的是自增前的值
        System.out.println(atomicInteger.getAndIncrement());
        System.out.println(atomicInteger.get());

        // 以原子方式将当前值加1,这里返回的是自增后的值
        System.out.println(atomicInteger.incrementAndGet());

        // 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
        System.out.println(atomicInteger.addAndGet(8));

        // 以原子方式设置为newValue的值,并返回旧值
        System.out.println(atomicInteger.getAndSet(20));
        System.out.println(atomicInteger.get());
    }
}
8.5.2.2 案例改造

使用AtomicInteger对案例进行改造。

public class VolatileAtomicThread implements Runnable {

    // 定义一个int类型的变量
    private AtomicInteger atomicInteger = new AtomicInteger() ;

    @Override
    public void run() {

        // 对该变量进行++操作,100次
        for(int x = 0 ; x < 100 ; x++) {
            int i = atomicInteger.incrementAndGet();
            System.out.println("count =========>>>> " + i);
        }
    }
}

image-20231229154836711

通过控制台的执行结果,我们可以看到最终得到的结果就是10000,因此也就证明AtomicInteger所提供的方法是原子性操作方法。

8.6 AtomicInteger原理

8.6.1 原理介绍

AtomicInteger的本质:自旋锁 + CAS算法

CAS的全成是: Compare And Swap(比较再交换); 是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。CAS可以将read-modify-write转换为原子操作,这个原子操作直接由处理器保证。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当旧预期值A和内存值V相同时,将内存值V修改为B并返回true,否则什么都不做,并返回false。

举例说明:

  1. 在内存值V当中,存储着值为10的变量。

1571817059527

  1. 此时线程1想要把变量的值增加1。对线程1来说,旧的预期值 A = 10 ,要修改的新值 B = 11。

1571817085047

  1. 在线程1要提交更新之前,另一个线程2抢先一步,把内存值V中的变量值率先更新成了11。

1571817628904

  1. 线程1开始提交更新,首先进行A和内存值V的实际值比较(Compare),发现A不等于V的值,提交失败。

1571818176635

  1. 线程1重新获取内存值V作为当前A的值,并重新计算想要修改的新值。此时对线程1来说,A = 11,B = 12。这个重新尝试的过程被称为自旋

1571818465276

  1. 这一次比较幸运,没有其他线程改变V的值。线程1进行Compare,发现A和V的值是相等的。

1571818597998

  1. 线程1进行SWAP,把内存V的值替换为B,也就是12。

1571818747880

举例说明:这好比春节的时候抢火车票,下手快的会抢先买到票,而下手慢的可以再次尝试,直到买到票。

8.6.2 源码分析

那么接下来我们就来查看一下AtomicInteger类中incrementAndGet方法的源码。

public class AtomicInteger extends Number implements java.io.Serializable {
    
    // cas算法的实现类
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    
    // 表示变量值在内存中的偏移量地址,unsafe类就是根据内存偏移量地址获取数据值。
    private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
    private volatile int value;
    
    // 以原子方式将当前值加1,这里返回的是自增后的值
    public final int incrementAndGet() {
        
        /* this表示当前AtomicInteger对象 ,1表示要增加的值 */
        return U.getAndAddInt(this, VALUE, 1) + 1;		// 调用Unsafe类中的getAndAddInt方法
        
    }
    
}

UnSafe类

public final class Unsafe {
    
    // Unsafe类中的getAndAddInt方法
    public final int getAndAddInt(Object o, long offset, int delta) {
        
        int v;
        
        // do...while就是自旋操作,当CAS成功以后,循环结束
        do {
            // 获取AtomicInteger类中所封装的int类型的值,就相当于旧的预期值A
            v = getIntVolatile(o, offset); 
            
            // 调用本类的weakCompareAndSetInt方法实现比较在交换; o: AtomicInteger对象, v: 相当于旧的预期值A, v + delta:新值B
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        
        return v;
    }
    
    // Unsafe类中的weakCompareAndSetInt方法
    public final boolean weakCompareAndSetInt(Object o, long offset, int expected, int x) {
        return compareAndSetInt(o, offset, expected, x);
    }

    // 本地方法,调用CPU指令实现CAS
    public final native boolean compareAndSetInt(Object o, long offset, int expected, int x);
    
}

8.7 CAS与Synchronized

CAS和Synchronized都可以保证多线程环境下共享数据的安全性。那么他们两者有什么区别?

Synchronized是从悲观的角度出发:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。因此Synchronized我们也将其称之为悲观锁。jdk中的ReentrantLock也是一种悲观锁。

CAS是从乐观的角度出发:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。CAS这种机制我们也可以将其称之为乐观锁

9、并发工具类

在JDK的并发包里提供了几个非常有用的并发容器和并发工具类。供我们在多线程开发中进行使用。

9.1 ConcurrentHashMap

9.1.1 概述以及基本使用

在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了保证数据的安全性我们可以使用Hashtable,但是Hashtable的效率低下。

基于以上两个原因我们可以使用JDK1.5以后所提供的ConcurrentHashMap。

案例1:演示HashMap线程不安全

实现步骤

  1. 创建一个HashMap集合对象
  2. 创建两个线程对象,第一个线程对象向集合中添加元素(1-24),第二个线程对象向集合中添加元素(25-50);
  3. 主线程休眠1秒,以便让其他两个线程将数据填装完毕
  4. 从集合中找出键和值不相同的数据

测试类

public class HashMapDemo01 {

    public static void main(String[] args) {

        // 创建一个HashMap集合对象
        HashMap<String , String> hashMap = new HashMap<String , String>() ;

        // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
        Thread t1 = new Thread() {

            @Override
            public void run() {

                // 第一个线程对象向集合中添加元素(1-24)
                for(int x = 1 ; x < 25 ; x++) {
                    hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 线程t2
        Thread t2 = new Thread() {

            @Override
            public void run() {

                // 第二个线程对象向集合中添加元素(25-50)
                for(int x = 25 ; x < 51 ; x++) {
                    hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 启动线程
        t1.start();
        t2.start();

        System.out.println("----------------------------------------------------------");

        try {

            // 主线程休眠2s,以便让其他两个线程将数据填装完毕
            TimeUnit.SECONDS.sleep(2);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 从集合中找出键和值不相同的数据
        for(int x = 1 ; x < 51 ; x++) {

            // HashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
            if( !String.valueOf(x).equals( hashMap.get(String.valueOf(x)) ) ) {
                System.out.println(String.valueOf(x) + ":" + hashMap.get(String.valueOf(x)));
            }
        }
    }
}

控制台输出结果

----------------------------------------------------------
5:null

通过控制台的输出结果,我们可以看到在多线程操作HashMap的时候,可能会出现线程安全问题。

注1:需要多次运行才可以看到具体的效果; 可以使用循环将代码进行改造,以便让问题方便的暴露出来!

案例2:演示Hashtable线程安全

测试类

public class HashtableDemo01 {

    public static void main(String[] args) {

        // 创建一个Hashtable集合对象
        Hashtable<String , String> hashtable = new Hashtable<String , String>() ;

        // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
        Thread t1 = new Thread() {

            @Override
            public void run() {

                // 第一个线程对象向集合中添加元素(1-24)
                for(int x = 1 ; x < 25 ; x++) {
                    hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 线程t2
        Thread t2 = new Thread() {

            @Override
            public void run() {

                // 第二个线程对象向集合中添加元素(25-50)
                for(int x = 25 ; x < 51 ; x++) {
                    hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 启动线程
        t1.start();
        t2.start();

        System.out.println("----------------------------------------------------------");

        try {

            // 主线程休眠2s,以便让其他两个线程将数据填装完毕
            TimeUnit.SECONDS.sleep(2);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 从集合中找出键和值不相同的数据
        for(int x = 1 ; x < 51 ; x++) {

            // Hashtable中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
            if( !String.valueOf(x).equals( hashtable.get(String.valueOf(x)) ) ) {
                System.out.println(String.valueOf(x) + ":" + hashtable.get(String.valueOf(x)));
            }
        } 
    }
}

不论该程序运行多少次,都不会产生数据问题。因此也就证明Hashtable是线程安全的。

Hashtable保证线程安全的原理

查看Hashtable的源码

public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable {
    
    // Entry数组,一个Entry就相当于一个元素
    private transient Entry<?,?>[] table;
    
    // Entry类的定义
    private static class Entry<K,V> implements Map.Entry<K,V> {
        final int hash;		// 当前key的hash码值
        final K key;		// 键
        V value;			// 值
        Entry<K,V> next;	// 下一个节点
    }
    
    // 存储数据
    public synchronized V put(K key, V value){...}
    
    // 获取数据
    public synchronized V get(Object key){...}
    
    // 获取长度
    public synchronized int size(){...}
    
    ...
    
}

对应的结构如下图所示

1571905221097

Hashtable保证线程安全性的是使用方法全局锁进行实现的。在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

案例3:演示ConcurrentHashMap线程安全

测试类

public class ConcurrentHashMapDemo01 {

    public static void main(String[] args) {

        // 创建一个ConcurrentHashMap集合对象
        ConcurrentHashMap<String , String> concurrentHashMap = new ConcurrentHashMap<String , String>() ;

        // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
        Thread t1 = new Thread() {

            @Override
            public void run() {

                // 第一个线程对象向集合中添加元素(1-24)
                for(int x = 1 ; x < 25 ; x++) {
                    concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 线程t2
        Thread t2 = new Thread() {

            @Override
            public void run() {

                // 第二个线程对象向集合中添加元素(25-50)
                for(int x = 25 ; x < 51 ; x++) {
                    concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
                }

            }

        };

        // 启动线程
        t1.start();
        t2.start();

        System.out.println("----------------------------------------------------------");

        try {

            // 主线程休眠2s,以便让其他两个线程将数据填装完毕
            TimeUnit.SECONDS.sleep(2);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 从集合中找出键和值不相同的数据
        for(int x = 1 ; x < 51 ; x++) {

            // concurrentHashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
            if( !String.valueOf(x).equals( concurrentHashMap.get(String.valueOf(x)) ) ) {
                System.out.println(String.valueOf(x) + ":" + concurrentHashMap.get(String.valueOf(x)));
            }
        }
    }
}

不论该程序运行多少次,都不会产生数据问题。因此也就证明ConcurrentHashMap是线程安全的。

9.1.2 源码分析

由于ConcurrentHashMap在jdk1.7和jdk1.8的时候实现原理不太相同,因此需要分别来讲解一下两个不同版本的实现原理。

9.1.2.1 jdk1.7版本

ConcurrentHashMap中的重要成员变量

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
    
    /**
     * Segment翻译中文为"段" , 段数组对象
     */
    final Segment<K,V>[] segments;
    
    // Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        
        transient volatile int count;    			// Segment中元素的数量,由volatile修饰,支持内存可见性;
        transient int modCount;			 			// 对table的大小造成影响的操作的数量(比如put或者remove操作);
        transient int threshold;		 			// 扩容阈值;
        transient volatile HashEntry<K,V>[] table;  // 链表数组,数组中的每一个元素代表了一个链表的头部;
        final float loadFactor;			 			// 负载因子 
        
    }
    
    // Segment中的元素是以HashEntry的形式存放在数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由		     // volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。
    static final class HashEntry<K,V> {
        final int hash;					// 当前节点key对应的哈希码值
        final K key;					// 存储键
        volatile V value;				// 存储值
        volatile HashEntry<K,V> next;	// 下一个节点
    }
    
}

对应的结构如下图所示

1571880094854

简单来讲,就是ConcurrentHashMap比HashMap多了一次hash过程,第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。在进行写操作时,只需锁住写元素所在的Segment即可(这种锁被称为分段锁),其他Segment无需加锁,从而产生锁竞争的概率大大减小,提高了并发读写的效率。该种实现方式的缺点是hash过程比普通的HashMap要长(因为需要进行两次hash操作)。

ConcurrentHashMap的put方法源码分析

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { 
    
    public V put(K key, V value) {
        
        // 定义一个Segment对象
        Segment<K,V> s;
        
        // 如果value的值为空,那么抛出异常
        if (value == null) throw new NullPointerException();
        
        // hash函数获取key的hashCode,然后做了一些处理
        int hash = hash(key);
        
        // 通过key的hashCode定位segment
        int j = (hash >>> segmentShift) & segmentMask;
        
        // 对定位的Segment进行判断,如果Segment为空,调用ensureSegment进行初始化操作(第一次hash定位)
        if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) 
            s = ensureSegment(j);
        
        // 调用Segment对象的put方法添加元素
        return s.put(key, hash, value, false);
    }
    
    // Segment是一种可ReentrantLock,在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        
        // 添加元素
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            
            // 尝试对该段进行加锁,如果加锁失败,则调用scanAndLockForPut方法;在该方法中就要进行再次尝试或者进行自旋等待
            HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                
                // 获取HashEntry数组对象
                HashEntry<K,V>[] tab = table;
                
                // 根据key的hashCode值计算索引(第二次hash定位)
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) 
                    
                    // 若不为null
                    if (e != null) {
                        K k;
                        
                        // 判读当前节点的key是否和链表头节点的key相同(依赖于hashCode方法和equals方法) 
                        // 如果相同,值进行更新
                        if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        
                        e = e.next;
                    } else {  // 若头结点为null
                        
                        // 将新节点添加到链表中
                        if (node != null) 
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        
                        // 如果超过阈值,则进行rehash操作
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        } 	       
    }    
}

注:源代码进行简单讲解即可(核心:进行了两次哈希定位以及加锁过程)

9.1.2.2 jdk1.8版本

在JDK1.8中为了进一步优化ConcurrentHashMap的性能,去掉了Segment分段锁的设计。在数据结构方面,则是跟HashMap一样,使用一个哈希表table数组。(数组 + 链表 + 红黑树)

而线程安全方面是结合CAS机制 + 局部锁实现的,减低锁的粒度,提高性能。同时在HashMap的基础上,对哈希表table数组和链表节点的value,next指针等使用volatile来修饰,从而实现线程可见性。

ConcurrentHashMap中的重要成员变量

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
    
    // Node数组
    transient volatile Node<K,V>[] table;
    
    // Node类的定义
    static class Node<K,V> implements Map.Entry<K,V> { 
        
        final int hash;				// 当前key的hashCode值
        final K key;				// 键
        volatile V val;				// 值
        volatile Node<K,V> next;	// 下一个节点
        
    }
    
    // TreeNode类的定义
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // 父节点
        TreeNode<K,V> left;	   // 左子节点
        TreeNode<K,V> right;   // 右子节点
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;		   // 节点的颜色状态
    }
    
}

对应的结构如下图

1571901607504

ConcurrentHashMap的put方法源码分析

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
    
    // 添加元素
    public V put(K key, V value) {
    	return putVal(key, value, false);
	}
    
    // putVal方法定义
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        
        // key为null直接抛出异常
        if (key == null || value == null) throw new NullPointerException();
        
        // 计算key所对应的hashCode值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            
            // 哈希表如果不存在,那么此时初始化哈希表
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            
            // 通过hash值计算key在table表中的索引,将其值赋值给变量i,然后根据索引找到对应的Node,如果Node为null,做出处理
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                
                // 新增链表头结点,cas方式添加到哈希表table
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break;                   
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                
                // f为链表头结点,使用synchronized加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                
                                // 节点已经存在,更新value即可
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                
                                // 该key对应的节点不存在,则新增节点并添加到该链表的末尾
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                                
                            }
                            
                        } else if (f instanceof TreeBin) { // 红黑树节点,则往该红黑树更新或添加该节点即可
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                
                // 判断是否需要将链表转为红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    
    // CAS算法的核心类
    private static final sun.misc.Unsafe U;
    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            ...
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    
    // 原子获取链表节点
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    
    // CAS更新或新增链表节点
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
}

简单总结:

  1. 如果当前需要put的key对应的链表在哈希表table中还不存在,即还没添加过该key的hash值对应的链表,则调用casTabAt方法,基于CAS机制来实现添加该链表头结点到哈希表table中,避免该线程在添加该链表头结的时候,其他线程也在添加的并发问题;如果CAS失败,则进行自旋,通过继续第2步的操作;

  2. 如果需要添加的链表已经存在哈希表table中,则通过tabAt方法,基于volatile机制,获取当前最新的链表头结点f,由于f指向的是ConcurrentHashMap的哈希表table的某条链表的头结点,故虽然f是临时变量,由于是引用共享的该链表头结点,所以可以使用synchronized关键字来同步多个线程对该链表的访问。在synchronized(f)同步块里面则是与

    HashMap一样遍历该链表,如果该key对应的链表节点已经存在,则更新,否则在链表的末尾新增该key对应的链表节点。

9.2 CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作以后,再执行当前线程;比如我们在主线程需要开启2个其他线程,当其他的线程执行完毕以后我们再去执行主线程,针对这个需求我们就可以使用CountDownLatch来进行实现。

CountDownLatch中count down是倒着数数的意思;CountDownLatch是通过一个计数器来实现的,每当一个线程完成了自己的任务后,可以调用countDown()方法让计数器-1,当计数器到达0时,调用CountDownLatch的await()方法的线程阻塞状态解除,继续执行。

CountDownLatch的相关方法

public CountDownLatch(int count)						// 初始化一个指定计数器的CountDownLatch对象
public void await() throws InterruptedException			// 让当前线程等待
public void countDown()									// 计数器进行减1

案例演示:使用CountDownLatch完成上述需求(我们在主线程需要开启2个其他线程,当其他的线程执行完毕以后我们再去执行主线程)

实现思路:在main方法中创建一个CountDownLatch对象,把这个对象作为作为参数传递给其他的两个任务线程。

线程任务类1

public class CountDownLatchThread01 implements Runnable {

    // CountDownLatch类型成员变量
    private CountDownLatch countDownLatch ;
    public CountDownLatchThread01(CountDownLatch countDownLatch) {      // 构造方法的作用:接收CountDownLatch对象
        this.countDownLatch = countDownLatch ;
    }

    @Override
    public void run() {

        try {
            Thread.sleep(10000);
            System.out.println("10秒以后执行了CountDownLatchThread01......");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 调用CountDownLatch对象的countDown方法对计数器进行-1操作
        countDownLatch.countDown();

    }

}

线程任务类2

public class CountDownLatchThread02 implements Runnable {

    // CountDownLatch类型成员变量
    private CountDownLatch countDownLatch ;
    public CountDownLatchThread02(CountDownLatch countDownLatch) {      // 构造方法的作用:接收CountDownLatch对象
        this.countDownLatch = countDownLatch ;
    }

    @Override
    public void run() {

        try {
            Thread.sleep(3000);
            System.out.println("3秒以后执行了CountDownLatchThread02......");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 调用CountDownLatch对象的countDown方法对计数器进行-1操作
        countDownLatch.countDown();

    }

}

测试类

public class CountDownLatchDemo01 {

    public static void main(String[] args) {

        //  1. 创建一个CountDownLatch对象
        CountDownLatch countDownLatch = new CountDownLatch(2) ;                 // CountDownLatch中的计数器的默认值就是2

        //  2. 创建线程任务类对象,并且把这个CountDownLatch对象作为构造方法的参数进行传递
        CountDownLatchThread01 countDownLatchThread01 = new CountDownLatchThread01(countDownLatch) ;

        //  3. 创建线程任务类对象,并且把这个CountDownLatch对象作为构造方法的参数进行传递
        CountDownLatchThread02 countDownLatchThread02 = new CountDownLatchThread02(countDownLatch) ;

        //  4. 创建线程对象,并启动线程
        Thread t1 = new Thread(countDownLatchThread01);
        Thread t2 = new Thread(countDownLatchThread02);
        t1.start();
        t2.start();

        //  5. 在主线程中调用 CountDownLatch中的await让主线程处于阻塞状态
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //  6. 程序结束的输出
        System.out.println("主线程执行了.... 程序结束了......");
    }

}

控制台输出结果

3秒以后执行了CountDownLatchThread02......
10秒以后执行了CountDownLatchThread01......
主线程执行了.... 程序结束了......

CountDownLatchThread02线程先执行完毕,此时计数器-1;CountDownLatchThread01线程执行完毕,此时计数器-1;当计数器的值为0的时候,主线程阻塞状态接触,主线程向下执行。

9.3 CyclicBarrier

9.3.1 概述以及基本使用

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

例如:公司召集5名员工开会,等5名员工都到了,会议开始。我们创建5个员工线程,1个开会线程,几乎同时启动,使用CyclicBarrier保证5名员工线程全部执行后,再执行开会线程。

CyclicBarrier的相关方法

public CyclicBarrier(int parties, Runnable barrierAction)   // 用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
public int await()											// 每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞

案例演示:模拟员工开会

实现步骤:

  1. 创建一个员工线程类(EmployeeThread),该线程类中需要定义一个CyclicBarrier类型的形式参数
  2. 创建一个开会线程类(MettingThread)
  3. 测试类
    1. 创建CyclicBarrier对象
    2. 创建5个EmployeeThread线程对象,把第一步创建的CyclicBarrier对象作为构造方法参数传递过来
    3. 启动5个员工线程

员工线程类

public class EmployeeThread extends Thread {

    // CyclicBarrier类型的成员变量
    private CyclicBarrier cyclicBarrier ;
    public EmployeeThread(CyclicBarrier cyclicBarrier) {        // 使用构造方法对CyclicBarrier进行初始化
        this.cyclicBarrier = cyclicBarrier ;
    }

    @Override
    public void run() {

        try {

            // 模拟开会人员的随机到场
            Thread.sleep((int) (Math.random() * 1000));
            System.out.println(Thread.currentThread().getName() + " 到了! ");
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

开会线程类

public class MettingThread extends Thread {

    @Override
    public void run() {
        System.out.println("好了,人都到了,开始开会......");
    }

}

测试类

public class CyclicBarrierDemo01 {

    public static void main(String[] args) {

        // 创建CyclicBarrier对象
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5 , new MettingThread()) ;

        // 创建5个EmployeeThread线程对象,把第一步创建的CyclicBarrier对象作为构造方法参数传递过来
        EmployeeThread thread1 = new EmployeeThread(cyclicBarrier) ;
        EmployeeThread thread2 = new EmployeeThread(cyclicBarrier) ;
        EmployeeThread thread3 = new EmployeeThread(cyclicBarrier) ;
        EmployeeThread thread4 = new EmployeeThread(cyclicBarrier) ;
        EmployeeThread thread5 = new EmployeeThread(cyclicBarrier) ;

        // 启动5个员工线程
        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
    }
}
9.3.2 使用场景

使用场景:CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

比如:现在存在两个文件,这个两个文件中存储的是某一个员工两年的工资信息(一年一个文件),现需要对这两个文件中的数据进行汇总;使用两个线程读取2个文件中的数据,当两个文件中的数据都读取完毕以后,进行数据的汇总操作。

分析:要想在两个线程读取数据完毕以后进行数据的汇总,那么我们就需要定义一个任务类(该类需要实现Runnable接口);两个线程读取完数据以后再进行数据的汇总,那么我们可以将两个线程读取到的数据先存储到一个集合中,而多线程环境下最常见的线程集合类就是ConcurrentHashMap,而这个集合需要被两个线程都可以进行使用,那么我们可以将这个集合作为我们任务类的成员变量,然后我们在这个任务类中去定义一个CyclicBarrier对象,然后在定义一个方法(count),当调用这个count方法的时候需要去开启两个线程对象,使用这两个线程对象读取数据,把读取到的数据存储到ConcurrentHashMap对象,当一个线程读取数据完毕以后,调用CyclicBarrier的awit方法(告诉CyclicBarrier我已经到达了屏障),然后在任务类的run方法对ConcurrentHashMap的数据进行汇总操作;

实现步骤:

  1. 定义一个任务类CyclicBarrierThreadUse(实现了Runnable接口)
  2. 定义成员变量:CyclicBarrier ,ConcurrentHashMap
private CyclicBarrier cyclicBarrier = new CyclicBarrier(2 , this) ;
private ConcurrentHashMap<Integer , String> concurrentHashMap = new ConcurrentHashMap<Integer , String>() ;
  1. 定义一个方法count方法,在count方法中开启两个线程对象(可以使用匿名内部类的方式实现)
  2. 在run方法中对ConcurrentHashMap中的数据进行汇总
  3. 编写测试类CyclicBarrierThreadUseDemo
  4. 创建CyclicBarrierThreadUse对象,调用count方法

任务类代代码:

public class CyclicBarrierThreadUse implements Runnable {

    // 当前我们两个线程到达了屏障点以后,我们需要立即对数据进行汇总, 因此我们需要使用第二个构造方法
    // 并且我们当前这个类就是一个任务类,因此我们可以直接传递参数this
    private CyclicBarrier cyclicBarrier = new CyclicBarrier(2 , this) ;
    private ConcurrentHashMap<Integer , String> concurrentHashMap = new ConcurrentHashMap<Integer , String>() ;  // 存储两个线程所读取的数据

    public void count() {

        // 定义一个方法count方法,在count方法中开启两个线程对象(可以使用匿名内部类的方式实现)
        // 线程1
        new Thread(new Runnable() {

            @Override
            public void run() {

                // 读取数据
                BufferedReader bufferedReader = null ;
                try {


                    bufferedReader = new BufferedReader(new FileReader("D:\\salary\\2017-salary.txt")) ;
                    String line = null ;
                    while((line = bufferedReader.readLine()) != null) {
                        concurrentHashMap.put(Integer.parseInt(line) , "") ;            // 小的问题,工资信息不能重复
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }

                // 模拟任务的执行时间
                try {
                    TimeUnit.SECONDS.sleep(5) ;
                    System.out.println(Thread.currentThread().getName() + "---------------------线程读取数据完毕....");
                    cyclicBarrier.await() ;         //通知cyclicBarrier当前线程已经到达了屏障点
                } catch (Exception e) {
                    e.printStackTrace();
                }


            }

        }).start();

        // 线程2
        new Thread(new Runnable() {

            @Override
            public void run() {

                // 读取数据
                BufferedReader bufferedReader = null ;
                try {


                    bufferedReader = new BufferedReader(new FileReader("D:\\salary\\2019-salary.txt")) ;
                    String line = null ;
                    while((line = bufferedReader.readLine()) != null) {
                        concurrentHashMap.put(Integer.parseInt(line) , "") ;            // 小的问题,工资信息不能重复
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }

                // 模拟任务的执行时间
                try {
                    TimeUnit.SECONDS.sleep(10) ;
                    System.out.println(Thread.currentThread().getName() + "---------------------线程读取数据完毕....");
                    cyclicBarrier.await() ;         //通知cyclicBarrier当前线程已经到达了屏障点
                } catch (Exception e) {
                    e.printStackTrace();
                }


            }

        }).start();


    }

    @Override
    public void run() {

        // 获取concurrentHashMap中的数据进行汇总
        Enumeration<Integer> enumeration = concurrentHashMap.keys();        // 获取concurrentHashMap中所有的键

        /**
         * 这个Enumeration的使用和我们之前所学习过的迭代器类似
         * boolean hasMoreElements(); 判断集合中是否存在下一个元素
         * E nextElement();           获取元素
         */
        int result = 0 ;
        while(enumeration.hasMoreElements()) {
            Integer integer = enumeration.nextElement();
            result += integer ;
        }

        // 输出
        System.out.println(result);

    }


}

测试类代码:

public class CyclicBarrierThreadUseDemo01 {

    public static void main(String[] args) {
		
        // 创建任务类的对象
        CyclicBarrierThreadUse cyclicBarrierThreadUse = new CyclicBarrierThreadUse();
        
        // 调用count方法进行数据汇总
        cyclicBarrierThreadUse.count();

    }

}

9.4 Semaphore

Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目。

举例:现在有一个十字路口,有多辆汽车需要进经过这个十字路口,但是我们规定同时只能有两辆汽车经过。其他汽车处于等待状态,只要某一个汽车经过了这个十字路口,其他的汽车才可以经过,但是同时只能有两个汽车经过。如何限定经过这个十字路口车辆数目呢? 我们就可以使用Semaphore。

Semaphore的常用方法

public Semaphore(int permits)						permits 表示许可线程的数量
public void acquire() throws InterruptedException	表示获取许可
public void release()								表示释放许可

案例演示:模拟汽车通过十字路口

实现步骤:

  1. 创建一个汽车的线程任务类(CarThreadRunnable),在该类中定义一个Semaphore类型的成员变量
  2. 创建测试类
    1. 创建线程任务类对象
    2. 创建5个线程对象,并启动。(5个线程对象,相当于5辆汽车)

CarThreadRunnable类

public class CarThreadRunnable implements Runnable {

    // 创建一个Semaphore对象,限制只允许2个线程获取到许可证
    private Semaphore semaphore = new Semaphore(2) ;

    @Override
    public void run() {                         // 这个run只允许2个线程同时执行

        try {

            // 获取许可证
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "----->>正在经过十字路口");

            // 模拟车辆经过十字路口所需要的时间
            Random random = new Random();
            int nextInt = random.nextInt(7);
            TimeUnit.SECONDS.sleep(nextInt);

            System.out.println(Thread.currentThread().getName() + "----->>驶出十字路口");

            // 释放许可证
            semaphore.release();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

测试类

public class SemaphoreDemo01 {

    public static void main(String[] args) {

        // 创建线程任务类对象
        CarThreadRunnable carThreadRunnable = new CarThreadRunnable() ;

        // 创建5个线程对象,并启动。
        for(int x = 0 ; x < 5 ; x++) {
            new Thread(carThreadRunnable).start();
        }

    }

}

控制台输出结果

Thread-0----->>正在经过十字路口
Thread-1----->>正在经过十字路口
Thread-1----->>驶出十字路口
Thread-2----->>正在经过十字路口
Thread-0----->>驶出十字路口
Thread-3----->>正在经过十字路口
Thread-2----->>驶出十字路口
Thread-4----->>正在经过十字路口
Thread-4----->>驶出十字路口
Thread-3----->>驶出十字路口

通过控制台输出,我们可以看到当某一个汽车"驶出"十字路口以后,就会有一个汽车立马驶入。

9.5 Exchanger

9.5.1 概述以及基本使用

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。

举例:比如男女双方结婚的时候,需要进行交换结婚戒指。

Exchanger常用方法

public Exchanger()							// 构造方法
public V exchange(V x)						// 进行交换数据的方法,参数x表示本方数据 ,返回值v表示对方数据

这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

案例演示:模拟交互结婚戒指

实现步骤:

  1. 创建一个男方的线程类(ManThread),定义一个Exchanger类型的成员变量
  2. 创建一个女方的线程类(WomanThread),定义一个Exchanger类型的成员变量
  3. 测试类
    1. 创建一个Exchanger对象
    2. 创建一个ManThread对象,把第一步所创建的Exchanger作为构造方法参数传递过来
    3. 创建一个WomanThread对象,把第一步所创建的Exchanger作为构造方法参数传递过来
    4. 启动两个线程

ManThread类

public class ManThread extends Thread {

    // 定义Exchanger类型的变量
    private Exchanger<String> exchanger ;
    private String name ;
    public ManThread(Exchange<String> exchanger , String name) {
        super(name);
        this.name = name ;
        this.exchanger = exchanger ;
    }

    @Override
    public void run() {

        try {
            String result = exchanger.exchange("钻戒");
            System.out.println(name + "---->>把钻戒给媳妇");
            System.out.println(name + "---->>得到媳妇给的" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

WomanThread类

public class WomanThread extends Thread {

    // 定义Exchanger类型的变量
    private Exchanger<String> exchanger ;
    private String name ;
    public WomanThread(Exchanger<String> exchanger , String name) {
        super(name) ;
        this.name = name ;
        this.exchanger = exchanger ;
    }

    @Override
    public void run() {

        try {
            String result = exchanger.exchange("铝戒");
            System.out.println(name + "---->>把铝戒给老公");
            System.out.println(name + "---->>得到老公给的" + result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

测试类

public class ExchangerDemo01 {

    public static void main(String[] args) {

        // 创建一个Exchanger对象
        Exchanger<String> exchanger = new Exchanger<String>() ;

        // 创建一个ManThread对象
        ManThread manThread = new ManThread(exchanger , "杨过") ;

        // 创建一个WomanThread对象
        WomanThread womanThread = new WomanThread(exchanger , "小龙女") ;

        // 启动线程
        manThread.start();
        womanThread.start();

    }

}
9.5.2 使用场景

使用场景:可以做数据校对工作

比如: 现在存在一个文件,该文件中存储的是某一个员工一年的工资信息,现需要将这个员工的工资信息录入到系统中,采用AB岗两人进行录入,录入到两个文件中,系统需要加载这两个文件,并对两个文件数据进行校对,看看是否录入一致。

实现步骤:

  1. 创建一个测试类(ExchangerUseDemo)
  2. 通过匿名内部类的方法创建两个线程对象
  3. 两个线程分别读取文件中的数据,然后将数据存储到各自的集合中
  4. 当每一个线程读取完数据以后,就将数据交换给对方
  5. 然后每个线程使用对方传递过来的数据与自己所录入的数据进行比对

ExchangerUseDemo类

public class ExchangerUseDemo {

    public static void main(String[] args) {

        // 1. 创建Exchanger对象
        Exchanger<ArrayList<String>> exchanger = new Exchanger<ArrayList<String>>() ;

        // 2. 通过匿名内部类的方法创建两个线程对象
        new Thread(new Runnable() {

            @Override
            public void run() {


                try {

                    // 读取文件中的数据,然后将其存储到集合中
                    ArrayList<String> arrayList = new ArrayList<String>() ;
                    BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\salary\\2017-salary.txt")) ;
                    String line = null ;
                    while((line = bufferedReader.readLine()) != null) {
                        arrayList.add(line) ;
                    }

                    // arrayList.add("90000") ;
                    // arrayList.set(0 , "90000") ;
                    arrayList.remove(0) ;

                    // 调用Exchanger中的exchange方法完成数据的交换
                    ArrayList<String> exchange = exchanger.exchange(arrayList);

                    // 先比对长度
                    if(arrayList.size() == exchange.size()) {

                        // 然后使用对方线程所传递过来的数据和自己线程所读取到的数据进行比对
                        for(int x = 0 ; x < arrayList.size() ; x++) {

                            // 本方数据
                            String benfangElement = arrayList.get(x);

                            // 对方数据
                            String duifangElement = exchange.get(x);

                            // 比对
                            if(!benfangElement.equals(duifangElement)) {
                                System.out.println("数据存在问题.....");
                            }

                        }

                    }else  {
                        System.out.println("数据存在问题.....");
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }

            }

        }).start();

        // 线程2
        new Thread(new Runnable() {

            @Override
            public void run() {


                try {

                    // 读取文件中的数据,然后将其存储到集合中
                    ArrayList<String> arrayList = new ArrayList<String>() ;
                    BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\salary\\2017-salary.txt")) ;
                    String line = null ;
                    while((line = bufferedReader.readLine()) != null) {
                        arrayList.add(line) ;
                    }

                    // 调用Exchanger中的exchange方法完成数据的交换
                    ArrayList<String> exchange = exchanger.exchange(arrayList);

                    // 先比对长度
                    if(arrayList.size() == exchange.size()) {

                        // 然后使用对方线程所传递过来的数据和自己线程所读取到的数据进行比对
                        for(int x = 0 ; x < arrayList.size() ; x++) {

                            // 本方数据
                            String benfangElement = arrayList.get(x);

                            // 对方数据
                            String duifangElement = exchange.get(x);

                            // 比对
                            if(!benfangElement.equals(duifangElement)) {
                                System.out.println("数据存在问题.....");
                            }

                        }

                    }else  {
                        System.out.println("数据存在问题.....");
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
文章来源:https://blog.csdn.net/m0_47114547/article/details/135429713
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。