并发编程知识点梳理

发布时间:2024年01月19日

并发编程

在了解并发编程基本知识,先了解几本书,方便学习提高,分别为:java编程思想、企业应用架构模式、并发编程实战。

多线程中的设计模式有:Future、Master-Worker、生产者-消费者。

本次课程分为以下几部分进行讲解:

  1. 基础篇,包含线程安全基础知识(synchronized关键字、volatile关键字以及他们的实际应用场景);线程之间通信(wait、notify、ThreadLocal、单例模式、多线程)
  2. 并发编程中的集合类,这部分主要包括:同步容器、并发容器的基本概念及使用方法;ConcurrentHashMap/ConcurrentSkipListMap/CopyOnWriteArrayList/CopyOnWriteArraySet的讲解及其底层实现;实战队列(Queue)和双端队列(Deque)的基本知识及应用;多线程的设计模式:Future模式、Master-Worker模式、生产者-消费者模式

第三节

所谓的多个线程多个锁,就是多个线程,每个线程都有到自己指定的锁,彼此之间没有锁冲突。获得锁之后,每个线程都可以执行synchronized方法体的内容,彼此毫无影响。具体参见示例MultiThread。代码如下所示:

public class MultiThread {

?? private int num = 0;

?? /** static */

?? public synchronized void printNum(String tag) {

?? ??? try {

?? ??? ??? if (tag.equals("a")) {

?? ??? ??? ??? num = 100;

?? ??? ??? ??? System.out.println("tag a, set num over!");

?? ??? ??? ??? Thread.sleep(1000);

?? ??? ??? } else {

?? ??? ??? ??? num = 200;

?? ??? ??? ??? System.out.println("tag b, set num over!");

?? ??? ??? }

?? ??? ??? System.out.println("tag " + tag + ", num = " + num);

?? ??? } catch (InterruptedException e) {

?? ??? ??? e.printStackTrace();

?? ??? }

?? }

?? // 注意观察run方法输出顺序

?? public static void main(String[] args) {

?? ??? // 俩个不同的对象

?? ??? final MultiThread m1 = new MultiThread();

?? ??? final MultiThread m2 = new MultiThread();

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? m1.printNum("a");

?? ??? ??? }

?? ??? });

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? m2.printNum("b");

?? ??? ??? }

?? ??? });

?? ??? t1.start();

?? ??? t2.start();

?? }

}

在java中关键字synchronized取得的锁一般都是对象锁,而不是将一段代码或方法当作锁。在示例代码的执行过程中,我们可以看到哪个线程先执行synchronized关键字修饰的方法,哪个线程就持有该方法所属对象的锁(Lock)。若存在两个对象,线程获得的就是两把不同的锁,他们彼此互不影响。不过在某些情况下,即使多个线程中每个线程都拥有自己的对象,他们还是需要竞争同一把锁,那就是在静态方法上加synchronized关键字。此时该静态方法拥有的锁是类一级别的锁(独占.class类),此时锁定的是class类,而非对象!

第四节

java中同步的核心问题是共享。如果该资源是非共享资源,就没必要对它进行同步操作了。如果该资源是共享资源,就必须对它进行同步操作。相反异步涉及的核心问题是独立,即彼此之间不受制约。这和http页面中的ajax请求一样,当访问某个页面的时候,我们可以一边浏览页面的静态内容,一边等待重要数据的加载,或者一边浏览某个控件远程加载过来的数据,一边等待另外一个控件从远程服务器加载数据。他们之间的操作互相没有干扰,可以同步进行。

既然同步的核心问题是共享,那么同步的主要目的就是保证共享资源在多线程环境中的线程安全,使其能够表现出正确的行为。共享资源要想线程安全,就必须具有两个重要的特性:原子性(同步)、可见性。参见示例:MyObject.java,具体代码如下所示:

/**

?* ========================================================================================================================================

?* 实验目的:多个线程竞争同一把锁的时候,多线程的执行情况

?* 实验环境:多线程

?* 实验材料:锁——MyObject.class,线程thread1,线程thread2

?* 实验结果:首先第一个被执行的方法的第一个输出语句打印,然后等待相应时间,该方法的第二个输出语句打印;

?* ?????????????????之后第二个被执行的方法的第一个输出语句打印,然后等待相应时间,该方法的第二个输出语句打印

?* ========================================================================================================================================

?* 对比实验:去除bespeak上的synchronized关键字,继续执行该程序

?* 对比结果:两个方法的第一个输出语句几乎同时打印,之后bespeak方法的第二个输出语句打印,因为等待时间较短,最后第一个方法的第二条输出语句打印

?* ========================================================================================================================================

?* 从两次试验结果可以看出当多个线程竞争同一把锁的时候,对于被该锁锁定的同步方法的访问是线性的,即同一时刻只能有一个线程对同步方法进行操作;但是对于

?* 与同步方法并行的非同步方法的访问是非线性的,即同一时刻可以有一个线程访问同步方法,另一个线程访问非同步方法

?*

?* @author Alon

?*

?*/

public class MyObject {

?? /**

?? ?* 打印资源

?? ?*

?? ?* @throws InterruptedException

?? ?*/

?? public static synchronized void print(String threadName) {

?? ??? System.out.println("线程" + threadName + "开始执行了!");

?? ??? try { // 让当前线程休眠8秒

?? ??? ??? Thread.sleep(1000 * 10);

?? ??? } catch (InterruptedException e) {

?? ??? ??? e.printStackTrace();

?? ??? }

?? ??? System.out.println("线程" + threadName + "执行完成!");

?? }

?? /**

?? ?* 预约

?? ?*

?? ?* @param threadName

?? ?*/

?? public static synchronized void bespeak(String threadName) {

?? ??? System.out.println("线程" + threadName + "开始预约");

?? ??? try { // 让当前线程休眠8秒

?? ??? ??? Thread.sleep(1000 * 4);

?? ??? } catch (InterruptedException e) {

?? ??? ??? e.printStackTrace();

?? ??? }

?? ??? System.out.println("线程" + threadName + "预约成功!");

?? }

?? public static void main(String[] args) throws InterruptedException {

?? ??? Thread t1 = new Thread("first") {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? MyObject.print(Thread.currentThread().getName());

?? ??? ??? }

?? ??? };

?? ??? Thread t2 = new Thread("second") {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? MyObject.bespeak(Thread.currentThread().getName());

?? ??? ??? }

?? ??? };

?? ??? t1.start();

?? ??? t2.start();

?? }

}

通过该类的执行我们可以看到:

  1. 假如t1线程先持有MyObject.class锁,那么t2线程要想调用该类的同步静态方法bespeak,就必须等待,即同步
  2. 假如t1线程先持有MyObject.class锁,那么t2线程异步的方式调用对象中的非synchronized修饰的方法bespeak
  1. 小节总结

java同步的主要目的是保护共享资源,使其表现出正确地行为,保证多个线程对共享资源的操作是安全的。当多个线程并发地访问某个共享资源时,为了保护使其表现出正确的行为,java通常会对其进行加锁操作,以限制并发访问。通俗的讲就是对多个线程进行排队操作,使其对共享资源的操作变得有序。对于java类中的同步方法和异步方法的访问,这里有以下几点需要注意:

  1. 当多个线程竞争一把锁的时候,对于同步方法的访问是线性的,即同一时刻只能有一个线程对同步方法进行访问,这时其他线程是不可以访问当前类中的同步方法的,即本同步方法或者其他同步方法。换句话说就是:只要当前的同步方法被访问,同一个类中的其他步方法就会处于禁止访问的状态
  2. 当一个线程访问某个类中的同步方法时,另外一个线程依旧可以访问该类中的非同步方法
  1. 知识拓展

数据分表,所谓数据分表就是将同一张表中的数据分拆存储到多张表中,比如:订单表由于数据量过大,当多个用户同时请求该表的数据时,数据库可能无法及时进行响应,为了提高数据库的响应速度,可以根据用户ID进行取模运算,将用户的订单数据分别保存于两张表中,这样当多个用户请求订单数据时,就会请求两张表,而非一张表。当然这个假设的前提时用户的订单量基本一致,这个前提是避免某些购物狂用户订单量是10个人订单量的总和的现象出现。当然,我们可以直接以用户表为例来说明分表案例。这里拓展介绍分表知识,只是为了说明分表与同步是两个问题、两个概念。

在mysql中数据库分表可以从不同的维度进行,这里我们只说了一种维度,即对同一张表中的数据进行水平分表。我们还可以从业务的维度对数据库进行垂直分库。

mysql主从同步,其主要作用就是一份数据多个备份,这样可以避免因主服务器宕机而造成数据丢失的现象发生。其实,这一架构的本质就是提高数据的安全性。mysql主从复制的基本原理:主从服务器之间维护一个长连接,方便数据的发送(主服务器)接收(从服务器),传递的是主服务器的数据变化日志(即binlog文件,这里的说法只是简要说明,当然真正的实现原理是复杂的)。基于这一架构,所谓同步就是主从服务器之间保持数据一致性进行的通信和从服务器生成数据这两者的总称。从外部用户的角度看主从服务器就像是一个大的存储磁盘,因为其对外表现的数据是一致的。所以上面关于同步的定义可以简化:主从服务器之间的数据传递过程就是同步(包含两部分:数据从主服务器传递到从服务器,从服务器依据主服务器传递的数据进行数据生成的操作)。接下来简单介绍mysql主从架构中主从服务器之间的数据交互,这个过程包含两种实现方式:同步传输,异步传输。所谓同步传输就是主服务器数据发生变化后,立即向从服务器发送数据变更日志,等待从服务器更新完成反馈后,才对外进行反馈(这里的对外指的是应用服务器,应用服务器接到反馈后方才进行下面的操作,比如响应用户,通知其他线程等)。所谓的异步传输就是主服务器数据更新完成后立刻对外进行反馈,然后主服务器后台异步将数据传递给从服务器,以便从服务器更新数据。这里我们可以看出同步传输和异步传输的区别:

  1. 同步传输中,主从之间对外表现成一个整体,就像是一个数据库服务器。这里我们假设主从服务器部署在不同的物理机上。异步传输中主从之间则是两个单独的个体,主服务器是主服务器,从服务器是从服务器。
  2. 从数据一致性的角度来看,同步传输中,主从服务器之间的数据对外是一致的;异步传输中则不然,主从服务器之间存在一定的误差,当然是在可控范围内的。
  3. 从性能的角度看,同步传输的性能会差于异步传输。

数据库分表和主从同步两者的相同点是:降低数据库的访问压力;不同点是:主从同步还有一个功能就是提高了数据的安全性。

上述描写仅仅是个人理解,对于不完整或者纰漏还请指正,切勿破口大骂!!!

第五节

本节介绍的案例为脏读。参见示例:DirtyRead

在java中所谓的脏读就是:当一个线程在处理一个比较耗时的业务时,另外一个线程不加限制的进行了干预,导致当前线程读取到了错误的数据。避免出现脏读的方法为:

在设计程序的时候,一定要从整体上考虑问题。通俗讲就是在对对象方法加锁时,需要考虑业务的整体性。本示例中setter和getter是一个业务整体,所以为了避免出现脏读,保证业务的原子性,这两个方法都要加上synchronized关键字。

  1. 知识拓展

数据四大特性,即ACID——即原子性、一致性、隔离性、持久性原子性就是指事务包含的所有操作要么全部成功,要么全部失败回滚。一致性是指事务必须使数据库从一个一致性状态变化到另一个一致性状态,也就是说事务执行之前和执行之后都必须处于一致性状态。隔离性是当多个用户并发访问数据库时,比如操作同一张表,数据库为每一个用户开启的事务不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的,即便是在数据库系统遇到故障的情况下也不会丢失提交事务的操作。

至此数据库四大特性的基本概念介绍完了。接下来我们详细说明一下事务的隔离性:当多个线程都开启事务操作数据库中的数据时,数据库系统要能够进行隔离操作,以保证各个线程获取数据的准确性。下面将介绍如果不考虑事务的隔离性,会发生的问题有

  1. 脏读。所谓的脏读就是一个事务在其处理过程中读取到了另一个没有提交的事务中的数据。比如:当一个事务正在修改某个数据,而在这个事务中这个修改还没有提交。这时另外一个并发的事务来访问该数据,就会造成两个事务得到的数据不一致。A向B转账100元,对应的sql如下所示:

update account set money = money + 100 where name = ‘B’;(执行完该操作后A通知B)

update account set money = money - 100 where name = ‘A’;

当执行完第一条SQL语句时,A通知B查看账户,B发现钱确实已经到账(由于没有了隔离性,B读取到了A事务中的数据,此时已经出现了脏读),但是之后无论第二条SQL是否执行,只要不提交事务,那么A转账涉及的所有操作都将回滚。此后B再次查看账户时会发现钱没有到账。

  1. 不可重复读。所谓不可重复读是指对于数据库中的某个数据,一个事务范围内的多次查询却返回了不同的结果。这是由于在查询间隔,另一个事务修改了该数据,并提交了事务。例如:事务T1读取到某一个数据后,事务T2立马修改了这个数据并且将事务提交到了数据库,事务T1再次读取该数据时就得到了不同的结果,此时就发生了不可重复读。

不可重复读与脏读的区别是:脏读是某一事务读取了另一个事务没有提交的脏数据,而不可重复读则是读取到了前一个事务提交的数据

在某些情况下,不可重复读并不是问题,比如我们多次查询某个数据当然以最后查询得到的结果为主。但在另一些情况下就有可能发生问题,例如对于同一个数据业务员A和业务员B依次查询就可能不同,A和B有可能打起来了。

  1. 虚读(幻读)。幻读是事务非独立执行时发生的一种现象。例如事务T1对一个表中所有的行的某个数据项做了从“1”修改为“2”的操作,这时事务T2又对这个表插入了一行数据项,而这个数据项的值还是“1”并且提交给数据库。而操作事务T1的用户如果再查看刚刚修改的数据,会发现还有一行没有修改,其实这行是事务T2中添加的,就好像产生幻觉一样,这就是幻读。

幻读读取到的是另一条事务已经提交的数据,这点和脏读不同,与不可重复读相同。但是与不可重复读不同的是,不可重复读查询到的都是同一个数据项,而幻读查询到的则是一批数据整体(比如数据的个数)。

针对上述问题MySQL为我们提供了四种隔离级别:

  1. 串行化(Serializable):可以避免脏读、不可重复读、幻读
  2. 可重复读(Repeatable read):可避免脏读、不可重复读的发生
  3. 读已提交(Read committed):可以避免脏读的发生
  4. 读未提交(Read uncommitted):最低级别,任何情况都无法保证

以上四种隔离级别中,隔离级别最高的是串行化(Serializable),隔离级别最低的是读未提交(Read uncommitted)。当然隔离级别越高,数据的安全性越高,但相应的执行效率也就越低。像串行化(Serializable)这样的级别,是以锁表的方式(类似于Java多线程中的锁)使得其他的线程只能在锁外等待,所以平时选用何种隔离级别应根据实际情况来定。在MySQL数据库中默认的隔离级别是可重复读(Repeatable read)。

与MySQL不同,Oracle针对上述问题只提出了两种隔离级别,即:串行化(Serializable)和读已提交(Read Committed),其中默认隔离级别是读已提交(Read Committed)。

在MySQL数据库中查看事务隔离级别可以使用命令:SELECT @@tx_isolation;

在MySQL数据库中设置事务隔离级别可以使用命令:SET [glogal | session] transaction isolation level 隔离级别名称;,或者set tx_isolation=’隔离级别名称’;。

需要注意的是:设置数据库的隔离级别时,一定要在事务开启之前进行

如果使用JDBC对数据库的事务设置隔离级别,应该在调用Connection对象的setAutoCommit(false);方法之前,通过调用Connection对象的setTransactionIsolation(level);来设置当前连接的隔离级别。至于参数level,可以使用Connection对象的字段,如下图所示:

TIM截图20180408172758

使用JDBC设置隔离级别的代码如下图所示:

TIM截图20180408172909

小结:隔离级别的设置只针对当前连接有效。对于MySQL命令窗口而言,一个窗口就相当于一个连接,当前窗口设置的隔离级别只针对当前窗口中的事务有效;对于JDBC来说Connection对象就相当于一个连接,而对于Connection对象设置的隔离级别只对该Connection对象有效,与其他Connection对象无关。

本段总结参考网络,网址为:https://www.cnblogs.com/fjdingsd/p/5273008.html

Oracle解决方案:为了保证一个事务不会看到另外一个还没有完成的事务产生的结果,使每个事务像是在单独、隔离的环境下运行,Oracle采用了undo的方式来解决。具体如下图所示:

TIM截图20180409175639

Oracle部分的整理不完整,后续需补充。

第六节

可重入锁,所谓的可重入锁,就是线程可以进入任何一个它已经拥有的锁同步着的代码块。java中用synchronized同步的代码块是可以重入的,这就意味着如果一个java线程进入了代码中的synchronized同步块并因此获得了该同步块使用的同步对象对应的管程上的锁那么这个线程可以进入由同一个管程对象所同步的另一个java代码块。这里的讲解比较抽象,我们可以通过下面的代码来理解(代码来源于SyncDubbo1):

public class SyncDubbo1 {

??? public synchronized void method1() {

??? ??? System.out.println("method1..");

??? ??? method2();

??? }

??? public synchronized void method2() {

??? ??? System.out.println("method2..");

??? ??? method3();

??? }

??? public synchronized void method3() {

??? ??? System.out.println("method3..");

??? }

??? public static void main(String[] args) {

??? ??? final SyncDubbo1 sd = new SyncDubbo1();

??? ??? Thread t1 = new Thread(new Runnable() {

??? ??? ??? @Override

??? ??? ??? public void run() {

??? ??? ??? ??? sd.method1();

??? ??? ??? }

??? ??? });

??? ??? t1.start();

??? }

}

由上面的代码可知,如果线程t1获取对象锁sd,那么它便可自由地出入synchronized修饰的三个方法:method1、method2、method3。通过这段代码,我们了解到关键字synchronized拥有锁重入的功能,即在使用synchronized时,若一个线程得到了一个对象的锁,那么再次请求该对象就可以再次得到该对象的锁。示例可参见:SyncDubbo1、SyncDubbo2和SyncException。其中SyncException示例中线程在获得锁之后抛出了异常,之后锁会被自动释放。

另外需要注意的是:在并发编程的时候一定要考虑周全,不然会出现下面几种严重的结果:1、如果程序出现了异常,却没有及时处理释放锁,那么会对应用程序中其他需要这个锁的线程产生严重的影响。2、如果应用程序出现异常,那么对于后面需要该程序正确执行的代码来说,这将是一个灾难。因为前面的程序没有正确执行,所以后面的执行在逻辑上就会出现严重错误。

在某些情况下,synchronized声明的方法是有问题的。比如A线程调用某个synchronized修饰的方法执行一个需要花费很长时间的任务,那么B线程就需要等待比较长的时间才能执行。这时我们就要考虑整个方法是否真的需要添加synchronized进行修饰,其实在很多时候我们并不需要对java类的整个方法添加synchronized修饰,需要添加synchronized修饰仅仅是方法内的部分代码,这就是java中的synchronized代码块。synchronized代码块用法比较灵活,可以使用任意的Object作为锁。不过我们需要注意的问题有:在synchronized代码块中,尽量不要使用String常量作为锁,因为使用String常量会出现死循环;另外需要注意的问题就是对象锁改变的问题:使用一个对象加锁的时候,如果对象本身发生了改变,那么synchronized代码块拥有的锁就不同了;如果对象本身不发生改变(即使对象的属性发生了改变),那么synchronized代码块拥有的锁依然是相同的。

volatile关键字,其主要作用就是使变量在多个线程之间可见。在java中,每个线程都有自己的工作内存,该内存中存放着所有线程共享的主内存中的变量值的拷贝。当线程执行时,其便在自己的工作内存中操作这些变量。为了存取一个共享变量,线程通常会:1、获取锁定——锁定共享内存(个人理解,解锁处与此处相同),2、清除自身工作内存,3、从所有线程的共享内存区中拷贝共享变量值,然后正确的装入到自身的工作内存中,4、线程解锁——解锁主内存时,将自身工作内存中的变量值写回到共享内存中。也就是说:一个变量被volatile修饰后,无论哪个线程修改该变量的值,其他线程都会接到通知,并重新从主内存中读取该变量的值,即:volatile的作用就是强制线程到主内存(所有线程共享的内存区)中读取变量值,而非去线程自身的工作内存中读取。这就实现变量在多个线程间安全可见。进一步讲解volatile的执行流程前,先了解下一个线程和被所有线程共享的主内存可执行的操作:1、线程——使用(use)、赋值(assign)、装载(load)、存储(store)、锁定(lock)、解锁(unlock);2、所有线程共享的主内存——读(read)、写(write)、锁定(lock)、解锁(unlock),其中主内存所有的操作都是原子的。volatile修饰的变量的操作流程如下图所示:

TIM截图20180410101212

volatile应用场景:两个线程协作交互,比如根据水温变化产生不同的响应,设想现实生活中两个人协同工作一个在现场测量水温,一个在锅炉旁控制火,他们之间的写作需要中间工具——对讲机。在程序设计时,对讲机就是被volatile修饰的变量,修改volatile修饰的变量的线程就是现场测试水温的工作人员,而另外一个线程就是根据水温调整锅炉的工作人员。

需要注意的是,volatile关键字虽拥有多个线程间的可见性,却不具备同步性——原子性,因此volatile是一个不会造成阻塞的,性能比synchronized强,却无法替代synchronized同步功能的轻量级synchronized。volatile可以适用多个线程读取/一个线程写,其他多个线程读取的场景,但无法适用多个线程同时写的场景,这是由其非原子性造成的,参见示例VolatileNotAtoic。从示例可以看出:volatile只具有可见性,无原子性,要实现原子性建议使用atomic类的系列对象(不过atomic类只保证本身方法原子性,无法保证多次操作的原子性,具体示例参见:AtomicUse。这里的意思是如果调用atomic类方法的方法不是原子性的,那么即便多次调用atomic类方法也无法保证方法的原子性)。

System.out.println(“=======”);,该方法是系统另起的线程进行打印的,比较浪费系统性能,所以在系统中不建议使用这种打印。

  1. 知识拓展(待整理)

并发编程中的锁

第七节

线程间通信,线程是操作系统中独立的个体,如果他们不经过特殊处理是无法成为一个整体的,为了使他们成为一个整体,就必须实现线程之间的通信。换言之,线程间通信是多个独立线程成为整体的必要前提。如果线程存在通信指挥,那么系统的在交互性方面会变得更加强大,并且CPU利用率方面也会得到大幅度的提升,同时还能使开发人员对线程任务的处理过程进行有效的把控和监督。java中实现线程通信的方式是:wait/notify。它们都是Object类的方法,换言之java为所有的对象提供了这两个方法。不过在实现线程通信的过程中这两个方法还需要synchronized关键字的配合。需要注意的是wait方法释放锁,而notify方法不会释放锁。线程通信的例子参见:ListAdd1、ListAdd2。代码如下所示:

import java.util.LinkedList

public class ListAdd1 {

?? private static volatile LinkedList<String> list = new LinkedList<String>();

?? private static Object lock = new Object();

?? public static void main(String[] args) {

?? ??? new Thread("t1") {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程启动");

?? ??? ??? ??? int i = 1;

?? ??? ??? ??? while (true) {

?? ??? ??? ??? ??? synchronized (lock) {

?? ??? ??? ??? ??? ??? if (list.size() == 5) {

?? ??? ??? ??? ??? ??? ??? lock.notify();

?? ??? ??? ??? ??? ??? } else {

?? ??? ??? ??? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "添加元素value" + i);

?? ??? ??? ??? ??? ??? ??? list.addFirst("value" + (i++));

?? ??? ??? ??? ??? ??? ??? try {

?? ??? ??? ??? ??? ??? ??? ??? Thread.sleep(1000 * 3);

?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? }

?? ??? ??? ??? }

?? ??? ??? }

?? ??? }.start();

?? ??? new Thread("t2") {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程启动");

?? ??? ??? ??? while (true) {

?? ??? ??? ??? ??? synchronized (lock) {

?? ??? ??? ??? ??? ??? if (list.size() == 0) {

?? ??? ??? ??? ??? ??? ??? try {

?? ??? ??? ??? ??? ??? ??? ??? lock.wait();

?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? ??? } else {

?? ??? ??? ??? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程开始移除元素:" + list.removeFirst());

?? ??? ??? ??? ??? ??? ??? try {

?? ??? ??? ??? ??? ??? ??? ??? Thread.sleep(1000 * 3);

?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? }

?? ??? ??? ??? }

?? ??? ??? }

?? ??? }.start();

?? }

}

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CountDownLatch;

/**

?* wait notfiy 方法,wait释放锁,notfiy不释放锁

?*

?* @author alienware

?*

?*/

public class ListAdd3 {

??

?? private volatile static List list = new ArrayList();

?? public void add() {

?? ??? list.add("bjsxt");

?? }

?? public int size() {

?? ??? return list.size();

?? }

?? public static void main(String[] args) {

?? ??? final ListAdd3 list2 = new ListAdd3();

?? ??? // 1 实例化出来一个 lock

?? ??? // 当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用

?? ??? // final Object lock = new Object();

?? ??? final CountDownLatch countDownLatch = new CountDownLatch(1);

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? try {

?? ??? ??? ??? ??? for (int i = 0; i < 10; i++) {

?? ??? ??? ??? ??? ??? list2.add();

?? ??? ??? ??? ??? ??? System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");

?? ??? ??? ??? ??? ??? Thread.sleep(500);

?? ??? ??? ??? ??? ??? if (list2.size() == 5) {

?? ??? ??? ??? ??? ??? ??? System.out.println("已经发出通知..");

?? ??? ??? ??? ??? ??? ??? countDownLatch.countDown();

?? ??? ??? ??? ??? ??? }

?? ??? ??? ??? ??? }

?? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? }

?? ??? ??? }

?? ??? }, "t1");

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? if (list2.size() != 5) {

?? ??? ??? ??? ??? try {

?? ??? ??? ??? ??? ??? countDownLatch.await();

?? ??? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? ??? }

?? ??? ??? ??? }

?? ??? ??? ??? System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");

?? ??? ??? ??? // throw new RuntimeException();

?? ??? ??? }

?? ??? }, "t2");

?? ??? t2.start();

?? ??? t1.start();

?? }

??

}

示例中体现了这样几个问题:

  1. wait方法不会释放锁,notify会释放锁
  2. 同步代码块尽量细粒度,如若不然,会出现一些小问题,比如第一个线程添加5个变量后,调用了notify,原本第二个线程要立刻执行,但是由于notify不释放锁的机制,第二个线程会一直等待等待第一个线程执行完释放锁后,才开始执行
  3. volatile关键字修饰变量,则该变量值在多个线程间可见
  4. CountDownLatch可以实现即时通知功能,即条件到达后,线程一立即停止,线程二立即执行。CountDownLatch中没有锁的概念,当线程二调用await()方法之后就处于等待状态,当线程一调用conutdown()方法后,系统会发送信号量给线程二,接到信号的线程二立马转入运行。(这部分依据视频整理,正确与否有待考证)

第八节

BlockingQueue是一个队列,并且支持阻塞:阻塞的放入数据或阻塞的获取数据。实现阻塞功能时,使用的方法是:put和take。

  1. put(Object object):将一个Object对象添加到BlockingQueue里,如果BlockingQueue中没有空间,则调用该方法的线程被阻断,知道BlockingQueue中有空间再继续
  2. take():获取BlockingQueue中排在首位的对象,如果BlockingQueue为空,那么调用该方法的线程进入等待状态,直到BlockingQueue中有新数据加入

可以通过java来模拟BlockingQueue中的这两个方法,使用到的技术:synchronized、wait/notify、LinkedList,具体示例参见:MyQueue,代码如下所示:

import java.util.LinkedList;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

/**

?* 自定义阻塞队列

?*

?* @author Alon

?*

?*/

public class MyQueue {

?? // 1 需要一个承装元素的集合

?? private LinkedList<Object> list = new LinkedList<Object>();

?? // 2 需要一个计数器

?? private AtomicInteger count = new AtomicInteger(0);

?? // 3 需要制定上限和下限

?? private final int minSize = 0;

?? private final int maxSize;

?? // 4 构造方法

?? public MyQueue(int size) {

?? ??? this.maxSize = size;

?? }

?? // 5 初始化一个对象 用于加锁

?? private final Object lock = new Object();

?? // put(anObject):

?? // 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.

?? public void put(Object obj) {

?? ??? synchronized (lock) {

?? ??? ??? while (count.get() == this.maxSize) {

?? ??? ??? ??? try {

?? ??? ??? ??? ??? lock.wait();

?? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? }

?? ??? ??? }

?? ??? ??? // 1 加入元素

?? ??? ??? list.add(obj);

?? ??? ??? // 2 计数器累加

?? ??? ??? count.incrementAndGet();

?? ??? ??? // 3 通知另外一个线程(唤醒)

?? ??? ??? lock.notify();

?? ??? ??? System.out.println("新加入的元素为:" + obj);

?? ??? }

?? }

?? // take:

?? // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.

?? public Object take() {

?? ??? Object ret = null;

?? ??? synchronized (lock) {

?? ??? ??? while (count.get() == this.minSize) {

?? ??? ??? ??? try {

?? ??? ??? ??? ??? lock.wait();

?? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? }

?? ??? ??? }

?? ??? ??? // 1 做移除元素操作

?? ??? ??? ret = list.removeFirst();

?? ??? ??? // 2 计数器递减

?? ??? ??? count.decrementAndGet();

?? ??? ??? // 3 唤醒另外一个线程

?? ??? ??? lock.notify();

?? ??? }

?? ??? return ret;

?? }

?? public int getSize() {

?? ??? return this.count.get();

?? }

?? public static void main(String[] args) {

?? ??? final MyQueue mq = new MyQueue(5);

?? ??? mq.put("a");

?? ??? mq.put("b");

?? ??? mq.put("c");

?? ??? mq.put("d");

?? ??? mq.put("e");

?? ??? System.out.println("当前容器的长度:" + mq.getSize());

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? mq.put("f");

?? ??? ??? ??? mq.put("g");

?? ??? ??? }

?? ??? }, "t1");

?? ??? t1.start();

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? Object o1 = mq.take();

?? ??? ??? ??? System.out.println("移除的元素为:" + o1);

?? ??? ??? ??? Object o2 = mq.take();

?? ??? ??? ??? System.out.println("移除的元素为:" + o2);

?? ??? ??? }

?? ??? }, "t2");

?? ??? try {

?? ??? ??? TimeUnit.SECONDS.sleep(2);

?? ??? } catch (InterruptedException e) {

?? ??? ??? e.printStackTrace();

?? ??? }

?? ??? t2.start();

?? }

}

java中锁的有:1、对象锁(当前对象/任意新建对象),2、类锁(XXX.class),3、字符串常量锁。另外加锁的粒度要尽可能的小,这样有利于提高程序的性能。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列通常应用于生产者和消费者场景,生产者即向队列里添加元素的线程,消费者即从队列里获取元素的线程阻塞队列即生产者存放元素、消费者获取并消费元素的容器。阻塞队列通常提供四种处理方法,如下图所示:

从图中可知在第一列中如果当前队列已满,并且继续调用add方法向队列中添加元素,那么将抛出IllegalStateException(“Queue full”)异常;如果当前队列为空,并且继续调用remove方法,那么将抛出NoSuchElementException异常;如果当前队列为空,并且继续调用element方法,那么将抛出NoSuchElementException异常。在第二列中,在使用offer方法时,如果成功返回true,否则返回false;使用poll方法时,有返回获取的元素,否则返回null。在第三列中,当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

java中常见的阻塞队列有:

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列
  2. LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
  3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列
  5. SynchronousQueue:一个不存储元素的阻塞队列
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

第九节

ThreadLocal是线程的局部变量,是一种多线程间并发访问变量的解决方案。与synchronized等加锁的方式不同,ThreadLocal完全不提供锁,而使用以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全

从性能上说,ThreadLocal不具有绝对的优势,在并发不是很高的情况下,加锁的性能会更好,但作为一套与锁无关的线程安全解决方案,在高并发量或者竞争激烈的场景中,使用ThreadLocal可以在一定程度上减少锁竞争。可以参考示例ConnThreadLocal.java,具体代码如下所示:

public class ConnThreadLocal {

?? public static ThreadLocal<String> th = new ThreadLocal<String>();

?? public void setTh(String value) {

?? ??? th.set(value);

?? }

?? public void getTh() {

?? ??? System.out.println(Thread.currentThread().getName() + ":" + this.th.get());

?? }

?? public static void main(String[] args) throws InterruptedException {

?? ??? final ConnThreadLocal ct = new ConnThreadLocal();

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? ct.setTh("张三");

?? ??? ??? ??? ct.getTh();

?? ??? ??? }

?? ??? }, "t1");

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? try {

?? ??? ??? ??? ??? Thread.sleep(1000);

?? ??? ??? ??? ??? ct.getTh();

?? ??? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? ??? e.printStackTrace();

?? ??? ??? ??? }

?? ??? ??? }

?? ??? }, "t2");

?? ??? t1.start();

?? ??? t2.start();

?? }

}

单例模式,最常见的就是饥饿模式和懒汉模式,一个直接实例化对象,一个在调用方法时进行实例化对象,在多线程模式下,考虑到性能和线程安全,我们一般选择下面两种比较经典的单例模式:dubble check instance和static inner class,在性能提高的同时,又保证了线程安全,参见DubbleSingletion,具体代码如下所示:

public class DubbleSingleton {

?? private static DubbleSingleton ds;

?? public static DubbleSingleton getDs() {

?? ??? if (ds == null) {

?? ??? ??? try {

?? ??? ??? ??? // 模拟初始化对象的准备时间...

?? ??? ??? ??? Thread.sleep(3000);

?? ??? ??? } catch (InterruptedException e) {

?? ??? ??? ??? e.printStackTrace();

?? ??? ??? }

?? ??? ??? synchronized (DubbleSingleton.class) {

?? ??? ??? ??? if (ds == null) {

?? ??? ??? ??? ??? ds = new DubbleSingleton();

?? ??? ??? ??? }

?? ??? ??? }

?? ??? }

?? ??? return ds;

?? }

?? public static void main(String[] args) {

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());

?? ??? ??? }

?? ??? }, "t1");

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());

?? ??? ??? }

?? ??? }, "t2");

?? ??? Thread t3 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());

?? ??? ??? }

?? ??? }, "t3");

?? ??? t1.start();

?? ??? t2.start();

?? ??? t3.start();

?? }

}

Singlectio,具体代码如下所示:

public class Singletion {

?? private static class InnerSingletion {

?? ??? private static Singletion single = new Singletion();

?? }

?? public static Singletion getInstance() {

?? ??? return InnerSingletion.single;

?? }

?? public static void main(String[] args) {

?? ??? Thread t1 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());

?? ??? ??? }

?? ??? }, "t1");

?? ??? Thread t2 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());

?? ??? ??? }

?? ??? }, "t2");

?? ??? Thread t3 = new Thread(new Runnable() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());

?? ??? ??? }

?? ??? }, "t3");

?? ??? t1.start();

?? ??? t2.start();

?? ??? t3.start();

?? }

}

第十节

jdk中包含许多工具类,比如:同步类容器,并发类容器。接下来将介绍java中的几种同步类容器,不过在讲解之前首先介绍一些基本概念:

java中集合框架主要有四类:List、Set、Queue、Map。其中List、Set、Queue接口继承了Collection接口,而Map本身是一个接口。注意Collection和Map是一个顶层接口;List、Set、Queue则分别代表数组、集合和队列这三大类容器,日常开发中常用的集合像ArrayList、LinkedList都实现了List接口,HashSet实现了Set接口,Deque(又称双向队列,允许在队首/队尾进行入队/出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外由于LinkedList是一个双向链表,所以其又实现了Deque接口。 这里说的这些集合基本上都是非线程安全的。如果多个线程并发的访问这些容器时,就会出现问题,其中最常见的问题是ConcurrentModificationException所以开发人员在编写程序的时候,必须手动的在访问这些容器的地方进行同步处理,但是这样会使容器的使用变得非常麻烦,所以java为我们提供了一些同步容器类。java中的同步容器类主要包括两类:1VectorStackHashTable2Collections类中提供的静态工厂方法创建的类第一种同步类容器中的Vector实现了List接口,所以其本质上是一个数组,和ArrayList类似,不过Vector中所有的方法都是synchronized方法,即所有方法都进行了同步措施。与Vector一样,Stack也是一个所有方法都进行了同步措施的同步类容器,它继承自Vector类。HashTable与HashMap类似,都实现了Map接口,但是前者中的所有方法都是synchronized方法,后者不是。第二种同步类容器Collections是一个工具提供类,注意它和Collection的区别,Collection是一个顶层的接口,而它不是;另外就是Collections类中提供了大量的方法,它们可以对集合进行各种各样的操作,比如对集合或者容器进行排序、查找,最重要的是,它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:

TIM截图20180411160022

虽然java为我们提供的同步类容器有很多优点,比如方便开发人员使用,但是也存在一些缺陷:同步类容器中大量使用synchronized,在一定程度上会影响程序的并发性能。注意:同步的主要作用就是使共享资源表现出正确的行为,另外同步类容器是线程安全的。多线程操作某个容器的时候,此时容器作为共享资源,需要加以保护,以使其表现出正确的行为——这里的意思是集合中存储的数据正确。对于非线程安全的集合,可能会出现错误现象,但是对于同步类容器,一般不会出现这种现象。

jdk1.5版以后增加了很多并发类容器,用以替代同步类容器,改善性能,这些容器大多位于concurrent包下。同步类容器的操作是串行化的,虽实现了线程安全,却降低了并发性,因此在多线程环境中,会严重降低应用程序的吞吐量。并发类容器是专门针对并发设计的,比如以使用ConcurrentHashMap来代替HashTable,而且ConcurrentHashMap中还添加了一些常见复合操作;可以使用CopyOnWriteArrayList代替Voctor,并发的CopyonWriteArraySet,以及并发的queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能的队列,后者是以阻塞形式的队列,具体实现Queue还有很多,例如ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。 接下来介绍java中常用的几个并发类容器:

  1. ConcurrentMap:该类是一个接口,其有两个重要的实现:ConcurrentHashMap和ConcurrentSkipListMap(支持并发排序功能,弥补ConcurrentHashMap无排序的缺陷)。在ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,它们有自己的锁。只要多个修改操作不发生在同一个段上,它们就可以并发的进行。ConcurrentHashMap一般会把一个整体分成16个小段(Segment),也就是说ConcurrentHashMap最高支持16个线程并发操作。这也是在多线程场景减小锁粒度从而降低锁竞争的一种方案。并且ConcurrentHashMap代码中多数共享变量使用volatile关键字进行声明,目的是第一时间获取修改的内容,性能非常好。
  2. Copy-On-Write容器:Copy-On-Write简称COW,是一种应用于程序设计中的优化策略。jdk里的COW容器有两种:CopyOnWriteArrayList和CopyOnWriteArraySet。COW容器非常游泳,可以在很多的并发场景中使用到。那到底什么时CopyOnWrite容器呢?CopyOnWrite容器即写时复制容器,写时复制地基本原理:当向某个容器中添加元素的时候,我们不直接在当前容器中进行添加,而是先备份(Copy)当前容器——复制出一个新容器,然后在这个新容器里面添加元素,添加完成之后,再将原容器的引用指向这个新容器。这样做的好处是我们可以对CopyOnWrite容器进行并发地读,而不需要加锁,因为当前容器不会添加任何元素。CopyOnWrite容器是读写分离思想的一种实现,所谓读写分离就是读和写分别操作不同地容器。这种类型的容器更适合读多写少场景。个人理解:根据这种思想,当一个容器中存在大量数据的时候,进行拷贝,这时耗费的内存是原容器所占内存的2倍,对于jvm来说这是一个不小的开销,很容易造成内存溢出,所以为了保证系统的稳健性,尽量避免在写多的场景中使用这种类型的容器,因为这样可以避免频繁的内存拷贝
  3. 并发Queue:在并发队列上jdk提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种它们都继承自Queue。其中ConcurrentLinkedQueue是一个适用于高并发场景下的队列,通过无锁方式实现高并发状态下的高性能。第一类并发Queue:通常情况下ConcurrentLinkedQueue的性能好于BlockingQueue。它是一个基于链接节点的无界的线程安全的队列。该队列中的元素遵循先进先出的原则。头部元素是最先加入的,尾部元素是最近加入的,该队列中不允许存在null元素。ConcurrentLinkedQueue类中重要的方法有:1add()offer(),加元素方法,两者没有任何区别;2poll()peek(),取头元素方法,两者有一定的差别,前者会删除元素,后者不会[A1]?。第二类并发Queue:ArrayBlockingQueue,一个基于数组的阻塞队列,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没有实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫做有界队列,在很多场合都可以使用。LinkedBlockingQueue,基于链表的阻塞队列,同ArrayBlockingQueue类似,LinkedBlockingQueue内部也维护着一个数据缓冲(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部采用了分离锁(读写分离两个锁),从而实现了生产者和消费者完全并行运行。它是一个无界队列SynchronousQueue,一种没有缓冲的队列,生产者产生的数据会直接被消费者获取并消费,新建这种队列一个对象后,如果没有消费线程等待数据,我们无法向其中添加数据,即使添加了,也会抛出Queue Full异常PriorityBlockingQueue:是一种基于优先级的无界阻塞队列(优先级的判断通过传入构造函数的Compator对象决定,即传入队列的对象必须实现Comparable接口),其内部采用公平锁控制线程同步,使用队列的时候,如果是向容器中添加数据,不会对容器中的元素进行排序,当获取元素的时候,则开始对元素进行排序DelayQueue:是一种带有延迟时间的Queue,其存放的元素只有到了指定的延迟时间后,才能被获取到。另外该队列中的元素必须实现Delayed接口,同时它也是一个大小没有限制的队列。其应用场景有很多,比如移除缓存中超时的数据、处理超时任务、关闭空闲链接等。
  1. 补充知识

我们都知道ArrayList是线程不安全的,但是为什么这么说呢?首先我们要明确线程安全的概念,线程安全就是共享资源在多个线程同时操作时始终表现出正确的结果,这里有两个关键的地方:1、共享资源,2、始终表现出正确的结果,3、多个线程同时操作。由此我们可以推断ArrayList非线程安全就是说当多个线程操作同一个ArrayList对象时,会出现不确定的结果,那么我们可以做一个测试,示例代码如下所示:

import java.util.ArrayList;

import java.util.List;

public class App {

?? // static Vector<String> list = new Vector<String>();

?? static List<String> list = new ArrayList<String>();

?? public static void main(String[] args) throws InterruptedException {

?? ??? Thread t1 = new Thread() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? int i = 0;

?? ??? ??? ??? while (true) {

?? ??? ??? ??? ??? list.add(String.valueOf(++i));

?? ??? ??? ??? ??? if (i == 10)

?? ??? ??? ??? ??? ??? break;

?? ??? ??? ??? }

?? ??? ??? }

?? ??? };

?? ??? Thread t2 = new Thread() {

?? ??? ??? @Override

?? ??? ??? public void run() {

?? ??? ??? ??? int i = 110;

?? ??? ??? ??? while (true) {

?? ??? ??? ??? ??? list.add(String.valueOf(++i));

?? ??? ??? ??? ??? if (i == 115)

?? ??? ??? ??? ??? ??? break;

?? ??? ??? ??? }

?? ??? ??? }

?? ??? };

?? t1.start();

?? ??? t2.start();

?? ??? Thread.sleep(1000 * 5);

?? ??? for (int i = 0; i < list.size(); i++) {

?? ??? ??? System.out.println(list.get(i));

?? ??? }

?? }

}

通过这个代码我们想要的结果是——数组长度:15,其中的值:1、2、3、4、5、6、7、8、9、10、111、112、113、114、115。但是多次(不超过5次,可能机器不同,这个结果也不同)执行发行了异常行为——数组长度:14,其中的值为:2、3、4、5、6、7、8、9、10、111、112、113、114、115。这就说明了ArrayList是非线程安全的。将ArrayList换成线程安全的Vector,多次(超过10次)执行发现结果始终是我们期待的结果,所以Vector是线程安全的。ArrayList并发执行出现异常的原因:添加元素需要两步完成(在指定位置存放元素,增大数组长度),这些操作组合在一起是非原子的。有些人说执行的时候会报ConcurrentModificationException,这个就说明了ArrayList是非线程安全的,个人觉得这种说法错误的,因为在这里我们需要证明的是ArrayList作为共享资源时的同步安全性,而非ArrayList的并发访问性,抛出该异常恰恰说明了ArrayList不支持并发写的同时,又并发地访问、修改或删除。同步类容器抛出这个异常也是这个原因,但是这不能说明同步类容器是不安全的

第十一节

Future模式有点类似于商品订单。比如再网购时,当看重某一件商品时,就可以提交订单,当订单处理完成后,在家里等待商品送货上门就可以了。或者说更像我们发送ajax请求的时候,页面是异步的进行后台处理,用户无需一直等待请求的结果,可以继续浏览或操作其他内容

第十八节

为了帮助开发人员有效地控制多线程,并且提高多线程开发效率,jdk提供了一套线程框架Executor。其是位于java.util.concurrent包中,jdk并发包的核心。该框架中最重要的类是:Executors,它扮演着线程工厂的角色,通过它可以创建具有特定功能的线程池。Executors类中创建线程的方法有:

  1. newFixedThreadPool()该方法会返回一个拥有固定数量线程的线程池。该方法返回的线程池的线程数始终不变,如果一个任务提交时,线程池中有空闲线程,那么该任务就会被立即执行;如果一个任务提交时,线程池中没有空闲的线程,那么该任务会被暂缓在一个队列中等待空闲的线程去执行。
  2. newSingleThreadExecutor()该方法会返回一个拥有一个线程的线程池。其创建的线程池与上一个方法创建的线程池性质类似:线程池中的线程数固定不变;任务提交时,如果有空闲线程就执行该任务,否则挂起任务到一个队列中,等待空闲线程执行。
  3. newCachedThreadPool(),该方法会返回一个可根据实际情况调整线程池中线程数量的线程池。也就是说该方法放回的线程池中的线程个数不确定,并且不限制线程数量。如果有任务提交,就会自动创建线程;如果没有任务提交,就不会创建线程。在没有任务时,空闲的线程会在60s后被自动回收(即线程的空闲时间60s)。
  4. newScheduledThreadPool(),该方法会返回一个SchededExecutorService对象,但该线程池可以指定线程的数量。

Executors类中创建线程池的方法内部均使用了ThreadPoolExecutor这个类。如果Executors工厂类无法满足工作需求,开发人员可以自己创建线程池。自定义线程池时需要继承ThreadPoolExecutor。该类构造方法的代码为:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ........... }

该构造函数的参数分别为:

  1. corePoolSize:核心线程数,初始化时线程池应该有的线程数
  2. maximumPoolSize:最大线程数,线程池的最大容量
  3. keepAliveTime:线程存活时间,即如果线程池不执行任务时的存活时间
  4. unit:时间单位,线程存活时间的时间单位
  5. workQueue:任意阻塞队列
  6. threadFactory:
  7. handler:线程拒绝策略

注意的事项1ThreadPoolExecutor构造函数中的参数workQueue对于队列类型比较敏感:1)、如果传入的参数是有界队列,当有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,那么会将任务添加到传入参数所代表的队列中;如果传入参数代表的队列已满,那么在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略或自定义方式2)、如果传入的参数是无界队列,那么除非系统资源耗尽,否则不存在任务入队失败的情况。也就是说当有新的任务到来,且系统线程数小于corePoolSize时,就会创建新的线程执行任务;当系统线程数达到corePoolSize后,就不会新建线程了。如果后续依然有新的任务加入,而又没有空闲的线程资源,那么任务会直接进入队列等待执行。如果任务创建和处理的速度差异很大,那么无界队列会保持快速增长,直到系统内存耗尽2ThreadPoolExecutor构造函数中的最后一个参数handler表示线程池拒绝策略,其可取值为:1)、AbortPolicy——直接抛出异常,阻止系统工作;2)、CallerRunsPolicy——只要线程池没有关闭,该策略就会直接在调用者线程中运行当前被丢弃的任务;3)、DiscardOldestPolicy——丢弃最老的一个请求,尝试再次提交当前任务;4)、DiscardPolicy——丢弃无法处理的任务,不给予任务处理;4)、自定义拒绝策略——通过实现RejectedExecutionHandler接口实现。

第二十节

在jdk的concurrent.util包中有几个比较常用的工具类,它们使用简单,方便开发。接下来我们将介绍几个常见的工具类:CyclicBarrier、CountDownLacth、Callable和Future、Samaphore信号量。

  1. CyclicBarrier工具类,常用于这样的场景:等待所有线程准备,当都准备好时,就立即执行所有线程。比如这样一个场景——用线程代表田径运动员,当所有运动员都准备好的时候,才允许起跑,否则就必须等待。这里我们就可以使用CyclicBarrier工具类来实现。示例参考:com.bjsxt.height.concurrent019.UseCyclicBarrier。具体代码如下所示:

import java.io.IOException;?

import java.util.Random;?

import java.util.concurrent.BrokenBarrierException;?

import java.util.concurrent.CyclicBarrier;?

import java.util.concurrent.ExecutorService;?

import java.util.concurrent.Executors;

public class UseCyclicBarrier {

???????? static class Runner implements Runnable {?

???????? ??? private CyclicBarrier barrier;?

???????? ??? private String name;??

???????? ??? public Runner(CyclicBarrier barrier, String name) {?

???????? ??????? this.barrier = barrier;?

???????? ? ??????this.name = name;?

???????? ??? }?

???????? ??? @Override?

???????? ??? public void run() {?

???????? ??????? try {?

???????? ??????????? Thread.sleep(1000 * (new Random()).nextInt(5));?

???????? ??????????? System.out.println(name + " 准备OK.");?

???????? ??????????? barrier.await();?

???????? ??????? } catch (InterruptedException e) {?

???????? ??????????? e.printStackTrace();?

???????? ??????? } catch (BrokenBarrierException e) {?

???????? ??????????? e.printStackTrace();?

???????? ??????? }?

???????? ??????? System.out.println(name + " Go!!");?

???????? ??? }?

???????? }

??? public static void main(String[] args) throws IOException, InterruptedException {?

??????? CyclicBarrier barrier = new CyclicBarrier(3);? // 3

??????? ExecutorService executor = Executors.newFixedThreadPool(3);?

??????? executor.submit(new Thread(new Runner(barrier, "zhangsan")));?

??????? executor.submit(new Thread(new Runner(barrier, "lisi")));?

??????? executor.submit(new Thread(new Runner(barrier, "wangwu")));?

??????? executor.shutdown();?

??? }

}

  1. CountDownLacth工具类,常用语这样的场景:监听子线程初始化动作,当所有子线程初始化完毕便通知主线程继续执行后续业务。可以参考示例:UseCountDownLatch,其位于包com.bjsxt.height.concurrent019中。具体代码如下所示:

import java.util.concurrent.CountDownLatch;

public class UseCountDownLatch {

???????? public static void main(String[] args) {

???????? ???????? final CountDownLatch countDown = new CountDownLatch(2);

???????? ???????? Thread t1 = new Thread(new Runnable() {

???????? ???????? ???????? @Override

???????? ???????? ???????? public void run() {

???????? ???????? ???????? ???????? try {

???????? ???????? ???????? ???????? ???????? System.out.println("进入线程t1" + "等待其他线程处理完成...");

???????? ???????? ???????? ???????? ???????? countDown.await();

???????? ???????? ???????? ???????? ???????? System.out.println("t1线程继续执行...");

???????? ???????? ???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? ???????? ???????? e.printStackTrace();

???????? ???????? ???????? ???????? }

???????? ???????? ???????? }

???????? ???????? },"t1");

???????? ???????? Thread t2 = new Thread(new Runnable() {

???????? ???????? ???????? @Override

???????? ???????? ???????? public void run() {

???????? ???????? ???????? ???????? try {

???????? ???????? ???????? ???????? ???????? System.out.println("t2线程进行初始化操作...");

???????? ???????? ???????? ???????? ???????? Thread.sleep(3000);

???????? ???????? ???????? ???????? ???????? System.out.println("t2线程初始化完毕,通知t1线程继续...");

???????? ???????? ???????? ???????? ???????? countDown.countDown();

???????? ???????? ???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? ???????? ???????? e.printStackTrace();

???????? ???????? ???????? ???????? }

???????? ???????? ???????? }

???????? ???????? });

???????? ???????? Thread t3 = new Thread(new Runnable() {

???????? ???????? ???????? @Override

???????? ???????? ???????? public void run() {

???????? ???????? ???????? ???????? try {

???????? ???????? ???????? ???????? ???????? System.out.println("t3线程进行初始化操作...");

???????? ???????? ???????? ???????? ???????? Thread.sleep(4000);

???????? ???????? ???????? ???????? ???????? System.out.println("t3线程初始化完毕,通知t1线程继续...");

???????? ???????? ???????? ???????? ???????? countDown.countDown();

???????? ???????? ???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? ???????? ???????? e.printStackTrace();

???????? ???????? ???????? ???????? }

???????? ???????? ???????? }

???????? ???????? });

???????? ???????? t1.start();

???????? ???????? t2.start();

???????? ???????? t3.start(); ????????

???????? }

}

  1. Callable和Future工具类,是jdk封装的Future模式的实现,其使用非常简单。可以参考示例UseFuture,其位于com.bjsxt.height.concurrent019包中。这部分知识可以参考第十五小节。需要注意的是Future模式适用于处理很耗时的业务逻辑,可以有效的减小系统的响应时间,提高系统的吞吐量。

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.FutureTask;

public class UseFuture implements Callable<String>{

???????? private String para;

???????? public UseFuture(String para){

???????? ???????? this.para = para;

???????? }

???????? /**

???????? ?* 这里是真实的业务逻辑,其执行可能很慢

???????? ?*/

???????? @Override

???????? public String call() throws Exception {

???????? ???????? //模拟执行耗时

???????? ???????? Thread.sleep(5000);

???????? ???????? String result = this.para + "处理完成";

???????? ???????? return result;

???????? }

???????? //主控制函数

???????? public static void main(String[] args) throws Exception {

???????? ???????? String queryStr = "query";

???????? ???????? //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类

???????? ???????? FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));

???????? ???????? FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));

???????? ???????? //创建一个固定线程的线程池且线程数为1,

???????? ???????? ExecutorService executor = Executors.newFixedThreadPool(2);

???????? ???????? //这里提交任务future,则开启线程执行RealData的call()方法执行

???????? ???????? //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值

???????? ???????? Future f1 = executor.submit(future);???? ???????? //单独启动一个线程去执行的

???????? ???????? Future f2 = executor.submit(future2);

???????? ???????? System.out.println("请求完毕");

???????? ???????? try {

???????? ???????? ???????? //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑

???????? ???????? ???????? System.out.println("处理实际的业务逻辑...");

???????? ???????? ???????? Thread.sleep(1000);

???????? ???????? } catch (Exception e) {

???????? ???????? ???????? e.printStackTrace();

???????? ???????? }

???????? ???????? //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待

???????? ???????? System.out.println("数据:" + future.get());

???????? ???????? System.out.println("数据:" + future2.get());

???????? ???????? executor.shutdown();

???????? }

}

  1. Semaphore,计数信号量,主要用于控制多线程对公共资源库的访问。典型的示例有:1)、公共厕所的蹲位······10个人等待5个蹲位的测试,满员后就只能出一个进一个;2)、地下停车场要有空余车位才能放行;3)、共享文件IO访问数的限制。技术信号量与线程池的区别主要在于:线程池用于控制线程的数量,信号量用于控制共享资源的并发量。比如这一段代码:Semaphore avialable = new Semaphore(int x, boolean y);,其中x表示可用资源数,y表示公平竞争关系还是非公平竞争关系(公平竞争导致排队,等待最久的线程先获取资源)。用法:在获取公共资源前,用Semaphore.acquire()获取资源,如果资源不可用,则直接阻塞,直到获取资源。操作完成后,使用Semaphore.release()归还资源。具体代码如下所示:

public class SemaphoreTest {?

??? private static final int NUMBER = 5;??? //限制资源访问数?

??? private static final Semaphore avialable = new Semaphore(NUMBER,true);?

??? public static void main(String[] args) {?

??????? ExecutorService pool = Executors.newCachedThreadPool();?

??????? Runnable r = new Runnable(){?

??????????? public void run(){?

??????????????? try {?

??????????????????? avialable.acquire();??? //此方法阻塞?

??????????????????? Thread.sleep(10*1000);?

??????????????????? System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完毕");?

??????????????????? avialable.release();?

?????????? ?????} catch (InterruptedException e) {?

??????????????????? e.printStackTrace();?

??????????????? }?

??????????? }?

??????? };?

??????? System.out.println(avialable.availablePermits());?

??????? for(int i=0;i<10;i++){?

??????????? pool.execute(r); ?

??????? }?

??????? System.out.println(avialable.availablePermits());?

??????? pool.shutdown();?

??? }??

??? public static String getNow(){?

??????? SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");?

??????? return sdf.format(new Date());?

??? }?

}?

相关概念:

  1. PV(Page View)即网站总访问量、页面浏览量或点击量,用户每刷新一次就会被记录一次
  2. UV(unique visitor)即独立访客数,访问网站的一台电脑客户端即为一个访客。一般来讲,每天0点到24点之间内相同IP的客户端只记录一次
  3. QPS(query per second)即每秒查询数,它反映了系统在业务上的繁忙程度,每次请求的背后,可能对应着多次磁盘IO,多次网络请求,多个CPU时间片等。我们通过qps可以非常直观地了解当前系统的业务情况,一旦当前qps超过所设定的预警阀值,可以考虑增加机器对集群进行扩容,以免压力过大造成服务器宕机。部署的时候可以根据前期压力测试得到的估值,结合后期综合运维情况,估算出阀值
  4. RT(response time)即请求响应时间,该指标直接表明前端用户体验,该值越大表明系统响应速度越慢,前端用户等待的时间也就越长,所以任何系统都想降低该指标

当然评判一个系统的指标还有诸如:CPU、内存、网络、磁盘等细节问题。有时也会涉及到数据库层面的设计,比如:select、update、delete/ps

网络资源:JDK 6中的java.util.concurrent包下实用工具类 | 学步园

另外需要注意的是:CyclicBarrier和CountDownLacth之间的区别是前者在等待多个线程准备好后,多个线程同时执行;后者则是在等待多个线程准备好后,让被通知的线程执行,这个线程大多时候指的是主线程

第二十三节

在java多线程中,可以用synchronized关键字来实现线程间“同步互斥”的工作,以达到保护共享资源的目的。当然,java也提供了另外一个更优秀的机制来完成这个工作,即Lock对象,这里我们主要讲解两种锁:重入锁/读、写锁。他们的功能比synchronized强大,比如:嗅探锁定、多路分支等功能。

  1. 重入锁:又名递归锁,所谓的重入锁是指同一个线程在外层方法获取锁后,在进入内层方法的时候会自动获得锁。在java中ReentrantLock是一个可重入的锁,当然synchronized也是一个可重入的锁。可重入锁的好处是在一定程度上避免了死锁。但是需要注意的是:最后一定要释放锁定,不然会造成锁永远无法释放的现象。具体参考下面的示例:

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class UseReentrantLock {

???????? private Lock lock = new ReentrantLock();

???????? public void method1(){

???????? ???????? try {

???????? ???????? ???????? lock.lock();

???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");

???????? ???????? ???????? Thread.sleep(1000);

???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");

???????? ???????? ???????? Thread.sleep(1000);

???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? e.printStackTrace();

???????? ???????? } finally {

???????? ???????? ???????? lock.unlock();

???????? ???????? }

???????? }

???????? public void method2(){

???????? ???????? try {

???????? ???????? ???????? lock.lock();

???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");

???????? ???????? ???????? Thread.sleep(2000);

???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");

???????? ???????? ???????? Thread.sleep(1000);

???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? e.printStackTrace();

???????? ???????? } finally {

???????? ???????? ???????? lock.unlock();

???????? ???????? }

???????? }

???????? public static void main(String[] args) {

???????? ???????? final UseReentrantLock ur = new UseReentrantLock();

???????? ???????? Thread t1 = new Thread(new Runnable() {

???????? ???????? ???????? @Override

???????? ???????? ???????? public void run() {

???????? ???????? ???????? ???????? ur.method1();

???????? ???????? ???????? ???????? ur.method2();

???????? ???????? ???????? }

???????? ???????? }, "t1");

???????? ???????? t1.start();

???????? ???????? try {

???????? ???????? ???????? Thread.sleep(10);

???????? ???????? } catch (InterruptedException e) {

???????? ???????? ???????? e.printStackTrace();

???????? ???????? }

???????? ???????? //System.out.println(ur.lock.getQueueLength());

???????? }??????

}

  1. 锁与等待/通知之间的关系,在讲解synchronized的时候,我们知道该关键字配合Object中的wait()和notify()、notifyAll()方法可以实现多个线程之间协同工作。同样在使用Lock锁时,可以与起到类似作用的Condition类配合使用,实现多个线程之间协同工作。不过需要注意的是Condition一定要与锁结合,即只有在锁的基础之上才会产生Condition。

关于锁的问题可以参考资源:https://www.cnblogs.com/qifengshi/p/6831055.html


这里介绍的是ConcurrentLinkedQueue

文章来源:https://blog.csdn.net/java_lover20111106/article/details/135684573
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。