在了解并发编程基本知识,先了解几本书,方便学习提高,分别为:java编程思想、企业应用架构模式、并发编程实战。
多线程中的设计模式有:Future、Master-Worker、生产者-消费者。
本次课程分为以下几部分进行讲解:
所谓的多个线程多个锁,就是多个线程,每个线程都有到自己指定的锁,彼此之间没有锁冲突。获得锁之后,每个线程都可以执行synchronized方法体的内容,彼此毫无影响。具体参见示例MultiThread。代码如下所示:
public class MultiThread {
?? private int num = 0;
?? /** static */
?? public synchronized void printNum(String tag) {
?? ??? try {
?? ??? ??? if (tag.equals("a")) {
?? ??? ??? ??? num = 100;
?? ??? ??? ??? System.out.println("tag a, set num over!");
?? ??? ??? ??? Thread.sleep(1000);
?? ??? ??? } else {
?? ??? ??? ??? num = 200;
?? ??? ??? ??? System.out.println("tag b, set num over!");
?? ??? ??? }
?? ??? ??? System.out.println("tag " + tag + ", num = " + num);
?? ??? } catch (InterruptedException e) {
?? ??? ??? e.printStackTrace();
?? ??? }
?? }
?? // 注意观察run方法输出顺序
?? public static void main(String[] args) {
?? ??? // 俩个不同的对象
?? ??? final MultiThread m1 = new MultiThread();
?? ??? final MultiThread m2 = new MultiThread();
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? m1.printNum("a");
?? ??? ??? }
?? ??? });
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? m2.printNum("b");
?? ??? ??? }
?? ??? });
?? ??? t1.start();
?? ??? t2.start();
?? }
}
在java中关键字synchronized取得的锁一般都是对象锁,而不是将一段代码或方法当作锁。在示例代码的执行过程中,我们可以看到哪个线程先执行synchronized关键字修饰的方法,哪个线程就持有该方法所属对象的锁(Lock)。若存在两个对象,线程获得的就是两把不同的锁,他们彼此互不影响。不过在某些情况下,即使多个线程中每个线程都拥有自己的对象,他们还是需要竞争同一把锁,那就是在静态方法上加synchronized关键字。此时该静态方法拥有的锁是类一级别的锁(独占.class类),此时锁定的是class类,而非对象!
在java中同步的核心问题是共享。如果该资源是非共享资源,就没必要对它进行同步操作了。如果该资源是共享资源,就必须对它进行同步操作。相反异步涉及的核心问题是独立,即彼此之间不受制约。这和http页面中的ajax请求一样,当访问某个页面的时候,我们可以一边浏览页面的静态内容,一边等待重要数据的加载,或者一边浏览某个控件远程加载过来的数据,一边等待另外一个控件从远程服务器加载数据。他们之间的操作互相没有干扰,可以同步进行。
既然同步的核心问题是共享,那么同步的主要目的就是保证共享资源在多线程环境中的线程安全,使其能够表现出正确的行为。共享资源要想线程安全,就必须具有两个重要的特性:原子性(同步)、可见性。参见示例:MyObject.java,具体代码如下所示:
/**
?* ========================================================================================================================================
?* 实验目的:多个线程竞争同一把锁的时候,多线程的执行情况
?* 实验环境:多线程
?* 实验材料:锁——MyObject.class,线程thread1,线程thread2
?* 实验结果:首先第一个被执行的方法的第一个输出语句打印,然后等待相应时间,该方法的第二个输出语句打印;
?* ?????????????????之后第二个被执行的方法的第一个输出语句打印,然后等待相应时间,该方法的第二个输出语句打印
?* ========================================================================================================================================
?* 对比实验:去除bespeak上的synchronized关键字,继续执行该程序
?* 对比结果:两个方法的第一个输出语句几乎同时打印,之后bespeak方法的第二个输出语句打印,因为等待时间较短,最后第一个方法的第二条输出语句打印
?* ========================================================================================================================================
?* 从两次试验结果可以看出当多个线程竞争同一把锁的时候,对于被该锁锁定的同步方法的访问是线性的,即同一时刻只能有一个线程对同步方法进行操作;但是对于
?* 与同步方法并行的非同步方法的访问是非线性的,即同一时刻可以有一个线程访问同步方法,另一个线程访问非同步方法
?*
?* @author Alon
?*
?*/
public class MyObject {
?? /**
?? ?* 打印资源
?? ?*
?? ?* @throws InterruptedException
?? ?*/
?? public static synchronized void print(String threadName) {
?? ??? System.out.println("线程" + threadName + "开始执行了!");
?? ??? try { // 让当前线程休眠8秒
?? ??? ??? Thread.sleep(1000 * 10);
?? ??? } catch (InterruptedException e) {
?? ??? ??? e.printStackTrace();
?? ??? }
?? ??? System.out.println("线程" + threadName + "执行完成!");
?? }
?? /**
?? ?* 预约
?? ?*
?? ?* @param threadName
?? ?*/
?? public static synchronized void bespeak(String threadName) {
?? ??? System.out.println("线程" + threadName + "开始预约");
?? ??? try { // 让当前线程休眠8秒
?? ??? ??? Thread.sleep(1000 * 4);
?? ??? } catch (InterruptedException e) {
?? ??? ??? e.printStackTrace();
?? ??? }
?? ??? System.out.println("线程" + threadName + "预约成功!");
?? }
?? public static void main(String[] args) throws InterruptedException {
?? ??? Thread t1 = new Thread("first") {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? MyObject.print(Thread.currentThread().getName());
?? ??? ??? }
?? ??? };
?? ??? Thread t2 = new Thread("second") {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? MyObject.bespeak(Thread.currentThread().getName());
?? ??? ??? }
?? ??? };
?? ??? t1.start();
?? ??? t2.start();
?? }
}
通过该类的执行我们可以看到:
java同步的主要目的是保护共享资源,使其表现出正确地行为,保证多个线程对共享资源的操作是安全的。当多个线程并发地访问某个共享资源时,为了保护使其表现出正确的行为,java通常会对其进行加锁操作,以限制并发访问。通俗的讲就是对多个线程进行排队操作,使其对共享资源的操作变得有序。对于java类中的同步方法和异步方法的访问,这里有以下几点需要注意:
数据分表,所谓数据分表就是将同一张表中的数据分拆存储到多张表中,比如:订单表由于数据量过大,当多个用户同时请求该表的数据时,数据库可能无法及时进行响应,为了提高数据库的响应速度,可以根据用户ID进行取模运算,将用户的订单数据分别保存于两张表中,这样当多个用户请求订单数据时,就会请求两张表,而非一张表。当然这个假设的前提时用户的订单量基本一致,这个前提是避免某些购物狂用户订单量是10个人订单量的总和的现象出现。当然,我们可以直接以用户表为例来说明分表案例。这里拓展介绍分表知识,只是为了说明分表与同步是两个问题、两个概念。
在mysql中数据库分表可以从不同的维度进行,这里我们只说了一种维度,即对同一张表中的数据进行水平分表。我们还可以从业务的维度对数据库进行垂直分库。
mysql主从同步,其主要作用就是一份数据多个备份,这样可以避免因主服务器宕机而造成数据丢失的现象发生。其实,这一架构的本质就是提高数据的安全性。mysql主从复制的基本原理:主从服务器之间维护一个长连接,方便数据的发送(主服务器)接收(从服务器),传递的是主服务器的数据变化日志(即binlog文件,这里的说法只是简要说明,当然真正的实现原理是复杂的)。基于这一架构,所谓同步就是主从服务器之间保持数据一致性进行的通信和从服务器生成数据这两者的总称。从外部用户的角度看主从服务器就像是一个大的存储磁盘,因为其对外表现的数据是一致的。所以上面关于同步的定义可以简化:主从服务器之间的数据传递过程就是同步(包含两部分:数据从主服务器传递到从服务器,从服务器依据主服务器传递的数据进行数据生成的操作)。接下来简单介绍mysql主从架构中主从服务器之间的数据交互,这个过程包含两种实现方式:同步传输,异步传输。所谓同步传输就是主服务器数据发生变化后,立即向从服务器发送数据变更日志,等待从服务器更新完成反馈后,才对外进行反馈(这里的对外指的是应用服务器,应用服务器接到反馈后方才进行下面的操作,比如响应用户,通知其他线程等)。所谓的异步传输就是主服务器数据更新完成后立刻对外进行反馈,然后主服务器后台异步将数据传递给从服务器,以便从服务器更新数据。这里我们可以看出同步传输和异步传输的区别:
数据库分表和主从同步两者的相同点是:降低数据库的访问压力;不同点是:主从同步还有一个功能就是提高了数据的安全性。
上述描写仅仅是个人理解,对于不完整或者纰漏还请指正,切勿破口大骂!!!
本节介绍的案例为脏读。参见示例:DirtyRead
在java中所谓的脏读就是:当一个线程在处理一个比较耗时的业务时,另外一个线程不加限制的进行了干预,导致当前线程读取到了错误的数据。避免出现脏读的方法为:
在设计程序的时候,一定要从整体上考虑问题。通俗讲就是在对对象方法加锁时,需要考虑业务的整体性。本示例中setter和getter是一个业务整体,所以为了避免出现脏读,保证业务的原子性,这两个方法都要加上synchronized关键字。
数据四大特性,即ACID——即原子性、一致性、隔离性、持久性。原子性就是指事务包含的所有操作要么全部成功,要么全部失败回滚。一致性是指事务必须使数据库从一个一致性状态变化到另一个一致性状态,也就是说事务执行之前和执行之后都必须处于一致性状态。隔离性是当多个用户并发访问数据库时,比如操作同一张表,数据库为每一个用户开启的事务不能被其他事务的操作所干扰,多个并发事务之间要相互隔离。持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的,即便是在数据库系统遇到故障的情况下也不会丢失提交事务的操作。
至此数据库四大特性的基本概念介绍完了。接下来我们详细说明一下事务的隔离性:当多个线程都开启事务操作数据库中的数据时,数据库系统要能够进行隔离操作,以保证各个线程获取数据的准确性。下面将介绍如果不考虑事务的隔离性,会发生的问题有:
update account set money = money + 100 where name = ‘B’;(执行完该操作后A通知B)
update account set money = money - 100 where name = ‘A’;
当执行完第一条SQL语句时,A通知B查看账户,B发现钱确实已经到账(由于没有了隔离性,B读取到了A事务中的数据,此时已经出现了脏读),但是之后无论第二条SQL是否执行,只要不提交事务,那么A转账涉及的所有操作都将回滚。此后B再次查看账户时会发现钱没有到账。
不可重复读与脏读的区别是:脏读是某一事务读取了另一个事务没有提交的脏数据,而不可重复读则是读取到了前一个事务提交的数据。
在某些情况下,不可重复读并不是问题,比如我们多次查询某个数据当然以最后查询得到的结果为主。但在另一些情况下就有可能发生问题,例如对于同一个数据业务员A和业务员B依次查询就可能不同,A和B有可能打起来了。
幻读读取到的是另一条事务已经提交的数据,这点和脏读不同,与不可重复读相同。但是与不可重复读不同的是,不可重复读查询到的都是同一个数据项,而幻读查询到的则是一批数据整体(比如数据的个数)。
针对上述问题MySQL为我们提供了四种隔离级别:
以上四种隔离级别中,隔离级别最高的是串行化(Serializable),隔离级别最低的是读未提交(Read uncommitted)。当然隔离级别越高,数据的安全性越高,但相应的执行效率也就越低。像串行化(Serializable)这样的级别,是以锁表的方式(类似于Java多线程中的锁)使得其他的线程只能在锁外等待,所以平时选用何种隔离级别应根据实际情况来定。在MySQL数据库中默认的隔离级别是可重复读(Repeatable read)。
与MySQL不同,Oracle针对上述问题只提出了两种隔离级别,即:串行化(Serializable)和读已提交(Read Committed),其中默认隔离级别是读已提交(Read Committed)。
在MySQL数据库中查看事务隔离级别可以使用命令:SELECT @@tx_isolation;
在MySQL数据库中设置事务隔离级别可以使用命令:SET [glogal | session] transaction isolation level 隔离级别名称;,或者set tx_isolation=’隔离级别名称’;。
需要注意的是:设置数据库的隔离级别时,一定要在事务开启之前进行。
如果使用JDBC对数据库的事务设置隔离级别,应该在调用Connection对象的setAutoCommit(false);方法之前,通过调用Connection对象的setTransactionIsolation(level);来设置当前连接的隔离级别。至于参数level,可以使用Connection对象的字段,如下图所示:
使用JDBC设置隔离级别的代码如下图所示:
小结:隔离级别的设置只针对当前连接有效。对于MySQL命令窗口而言,一个窗口就相当于一个连接,当前窗口设置的隔离级别只针对当前窗口中的事务有效;对于JDBC来说Connection对象就相当于一个连接,而对于Connection对象设置的隔离级别只对该Connection对象有效,与其他Connection对象无关。
本段总结参考网络,网址为:https://www.cnblogs.com/fjdingsd/p/5273008.html
Oracle解决方案:为了保证一个事务不会看到另外一个还没有完成的事务产生的结果,使每个事务像是在单独、隔离的环境下运行,Oracle采用了undo的方式来解决。具体如下图所示:
Oracle部分的整理不完整,后续需补充。
可重入锁,所谓的可重入锁,就是线程可以进入任何一个它已经拥有的锁同步着的代码块。在java中用synchronized同步的代码块是可以重入的,这就意味着如果一个java线程进入了代码中的synchronized同步块,并因此获得了该同步块使用的同步对象对应的管程上的锁,那么这个线程可以进入由同一个管程对象所同步的另一个java代码块。这里的讲解比较抽象,我们可以通过下面的代码来理解(代码来源于SyncDubbo1):
public class SyncDubbo1 {
??? public synchronized void method1() {
??? ??? System.out.println("method1..");
??? ??? method2();
??? }
??? public synchronized void method2() {
??? ??? System.out.println("method2..");
??? ??? method3();
??? }
??? public synchronized void method3() {
??? ??? System.out.println("method3..");
??? }
??? public static void main(String[] args) {
??? ??? final SyncDubbo1 sd = new SyncDubbo1();
??? ??? Thread t1 = new Thread(new Runnable() {
??? ??? ??? @Override
??? ??? ??? public void run() {
??? ??? ??? ??? sd.method1();
??? ??? ??? }
??? ??? });
??? ??? t1.start();
??? }
}
由上面的代码可知,如果线程t1获取对象锁sd,那么它便可自由地出入synchronized修饰的三个方法:method1、method2、method3。通过这段代码,我们了解到关键字synchronized拥有锁重入的功能,即在使用synchronized时,若一个线程得到了一个对象的锁,那么再次请求该对象就可以再次得到该对象的锁。示例可参见:SyncDubbo1、SyncDubbo2和SyncException。其中SyncException示例中线程在获得锁之后抛出了异常,之后锁会被自动释放。
另外需要注意的是:在并发编程的时候一定要考虑周全,不然会出现下面几种严重的结果:1、如果程序出现了异常,却没有及时处理释放锁,那么会对应用程序中其他需要这个锁的线程产生严重的影响。2、如果应用程序出现异常,那么对于后面需要该程序正确执行的代码来说,这将是一个灾难。因为前面的程序没有正确执行,所以后面的执行在逻辑上就会出现严重错误。
在某些情况下,synchronized声明的方法是有问题的。比如A线程调用某个synchronized修饰的方法执行一个需要花费很长时间的任务,那么B线程就需要等待比较长的时间才能执行。这时我们就要考虑整个方法是否真的需要添加synchronized进行修饰,其实在很多时候我们并不需要对java类的整个方法添加synchronized修饰,需要添加synchronized修饰仅仅是方法内的部分代码,这就是java中的synchronized代码块。synchronized代码块用法比较灵活,可以使用任意的Object作为锁。不过我们需要注意的问题有:在synchronized代码块中,尽量不要使用String常量作为锁,因为使用String常量会出现死循环;另外需要注意的问题就是对象锁改变的问题:使用一个对象加锁的时候,如果对象本身发生了改变,那么synchronized代码块拥有的锁就不同了;如果对象本身不发生改变(即使对象的属性发生了改变),那么synchronized代码块拥有的锁依然是相同的。
volatile关键字,其主要作用就是使变量在多个线程之间可见。在java中,每个线程都有自己的工作内存,该内存中存放着所有线程共享的主内存中的变量值的拷贝。当线程执行时,其便在自己的工作内存中操作这些变量。为了存取一个共享变量,线程通常会:1、获取锁定——锁定共享内存(个人理解,解锁处与此处相同),2、清除自身工作内存,3、从所有线程的共享内存区中拷贝共享变量值,然后正确的装入到自身的工作内存中,4、线程解锁——解锁主内存时,将自身工作内存中的变量值写回到共享内存中。也就是说:一个变量被volatile修饰后,无论哪个线程修改该变量的值,其他线程都会接到通知,并重新从主内存中读取该变量的值,即:volatile的作用就是强制线程到主内存(所有线程共享的内存区)中读取变量值,而非去线程自身的工作内存中读取。这就实现变量在多个线程间安全可见。进一步讲解volatile的执行流程前,先了解下一个线程和被所有线程共享的主内存可执行的操作:1、线程——使用(use)、赋值(assign)、装载(load)、存储(store)、锁定(lock)、解锁(unlock);2、所有线程共享的主内存——读(read)、写(write)、锁定(lock)、解锁(unlock),其中主内存所有的操作都是原子的。volatile修饰的变量的操作流程如下图所示:
volatile应用场景:两个线程协作交互,比如根据水温变化产生不同的响应,设想现实生活中两个人协同工作一个在现场测量水温,一个在锅炉旁控制火,他们之间的写作需要中间工具——对讲机。在程序设计时,对讲机就是被volatile修饰的变量,修改volatile修饰的变量的线程就是现场测试水温的工作人员,而另外一个线程就是根据水温调整锅炉的工作人员。
需要注意的是,volatile关键字虽拥有多个线程间的可见性,却不具备同步性——原子性,因此volatile是一个不会造成阻塞的,性能比synchronized强,却无法替代synchronized同步功能的轻量级synchronized。volatile可以适用多个线程读取/一个线程写,其他多个线程读取的场景,但无法适用多个线程同时写的场景,这是由其非原子性造成的,参见示例VolatileNotAtoic。从示例可以看出:volatile只具有可见性,无原子性,要实现原子性建议使用atomic类的系列对象(不过atomic类只保证本身方法原子性,无法保证多次操作的原子性,具体示例参见:AtomicUse。这里的意思是如果调用atomic类方法的方法不是原子性的,那么即便多次调用atomic类方法也无法保证方法的原子性)。
System.out.println(“=======”);,该方法是系统另起的线程进行打印的,比较浪费系统性能,所以在系统中不建议使用这种打印。
并发编程中的锁
线程间通信,线程是操作系统中独立的个体,如果他们不经过特殊处理是无法成为一个整体的,为了使他们成为一个整体,就必须实现线程之间的通信。换言之,线程间通信是多个独立线程成为整体的必要前提。如果线程存在通信指挥,那么系统的在交互性方面会变得更加强大,并且CPU利用率方面也会得到大幅度的提升,同时还能使开发人员对线程任务的处理过程进行有效的把控和监督。java中实现线程通信的方式是:wait/notify。它们都是Object类的方法,换言之java为所有的对象提供了这两个方法。不过在实现线程通信的过程中这两个方法还需要synchronized关键字的配合。需要注意的是wait方法释放锁,而notify方法不会释放锁。线程通信的例子参见:ListAdd1、ListAdd2。代码如下所示:
import java.util.LinkedList
public class ListAdd1 {
?? private static volatile LinkedList<String> list = new LinkedList<String>();
?? private static Object lock = new Object();
?? public static void main(String[] args) {
?? ??? new Thread("t1") {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程启动");
?? ??? ??? ??? int i = 1;
?? ??? ??? ??? while (true) {
?? ??? ??? ??? ??? synchronized (lock) {
?? ??? ??? ??? ??? ??? if (list.size() == 5) {
?? ??? ??? ??? ??? ??? ??? lock.notify();
?? ??? ??? ??? ??? ??? } else {
?? ??? ??? ??? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "添加元素value" + i);
?? ??? ??? ??? ??? ??? ??? list.addFirst("value" + (i++));
?? ??? ??? ??? ??? ??? ??? try {
?? ??? ??? ??? ??? ??? ??? ??? Thread.sleep(1000 * 3);
?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? }
?? ??? ??? ??? }
?? ??? ??? }
?? ??? }.start();
?? ??? new Thread("t2") {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程启动");
?? ??? ??? ??? while (true) {
?? ??? ??? ??? ??? synchronized (lock) {
?? ??? ??? ??? ??? ??? if (list.size() == 0) {
?? ??? ??? ??? ??? ??? ??? try {
?? ??? ??? ??? ??? ??? ??? ??? lock.wait();
?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? ??? } else {
?? ??? ??? ??? ??? ??? ??? System.out.println(Thread.currentThread().getName() + "线程开始移除元素:" + list.removeFirst());
?? ??? ??? ??? ??? ??? ??? try {
?? ??? ??? ??? ??? ??? ??? ??? Thread.sleep(1000 * 3);
?? ??? ??? ??? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? }
?? ??? ??? ??? }
?? ??? ??? }
?? ??? }.start();
?? }
}
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
?* wait notfiy 方法,wait释放锁,notfiy不释放锁
?*
?* @author alienware
?*
?*/
public class ListAdd3 {
??
?? private volatile static List list = new ArrayList();
?? public void add() {
?? ??? list.add("bjsxt");
?? }
?? public int size() {
?? ??? return list.size();
?? }
?? public static void main(String[] args) {
?? ??? final ListAdd3 list2 = new ListAdd3();
?? ??? // 1 实例化出来一个 lock
?? ??? // 当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用
?? ??? // final Object lock = new Object();
?? ??? final CountDownLatch countDownLatch = new CountDownLatch(1);
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? try {
?? ??? ??? ??? ??? for (int i = 0; i < 10; i++) {
?? ??? ??? ??? ??? ??? list2.add();
?? ??? ??? ??? ??? ??? System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
?? ??? ??? ??? ??? ??? Thread.sleep(500);
?? ??? ??? ??? ??? ??? if (list2.size() == 5) {
?? ??? ??? ??? ??? ??? ??? System.out.println("已经发出通知..");
?? ??? ??? ??? ??? ??? ??? countDownLatch.countDown();
?? ??? ??? ??? ??? ??? }
?? ??? ??? ??? ??? }
?? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? }
?? ??? ??? }
?? ??? }, "t1");
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? if (list2.size() != 5) {
?? ??? ??? ??? ??? try {
?? ??? ??? ??? ??? ??? countDownLatch.await();
?? ??? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? ??? }
?? ??? ??? ??? }
?? ??? ??? ??? System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
?? ??? ??? ??? // throw new RuntimeException();
?? ??? ??? }
?? ??? }, "t2");
?? ??? t2.start();
?? ??? t1.start();
?? }
??
}
示例中体现了这样几个问题:
BlockingQueue是一个队列,并且支持阻塞:阻塞的放入数据或阻塞的获取数据。实现阻塞功能时,使用的方法是:put和take。
可以通过java来模拟BlockingQueue中的这两个方法,使用到的技术:synchronized、wait/notify、LinkedList,具体示例参见:MyQueue,代码如下所示:
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
?* 自定义阻塞队列
?*
?* @author Alon
?*
?*/
public class MyQueue {
?? // 1 需要一个承装元素的集合
?? private LinkedList<Object> list = new LinkedList<Object>();
?? // 2 需要一个计数器
?? private AtomicInteger count = new AtomicInteger(0);
?? // 3 需要制定上限和下限
?? private final int minSize = 0;
?? private final int maxSize;
?? // 4 构造方法
?? public MyQueue(int size) {
?? ??? this.maxSize = size;
?? }
?? // 5 初始化一个对象 用于加锁
?? private final Object lock = new Object();
?? // put(anObject):
?? // 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
?? public void put(Object obj) {
?? ??? synchronized (lock) {
?? ??? ??? while (count.get() == this.maxSize) {
?? ??? ??? ??? try {
?? ??? ??? ??? ??? lock.wait();
?? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? }
?? ??? ??? }
?? ??? ??? // 1 加入元素
?? ??? ??? list.add(obj);
?? ??? ??? // 2 计数器累加
?? ??? ??? count.incrementAndGet();
?? ??? ??? // 3 通知另外一个线程(唤醒)
?? ??? ??? lock.notify();
?? ??? ??? System.out.println("新加入的元素为:" + obj);
?? ??? }
?? }
?? // take:
?? // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
?? public Object take() {
?? ??? Object ret = null;
?? ??? synchronized (lock) {
?? ??? ??? while (count.get() == this.minSize) {
?? ??? ??? ??? try {
?? ??? ??? ??? ??? lock.wait();
?? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? }
?? ??? ??? }
?? ??? ??? // 1 做移除元素操作
?? ??? ??? ret = list.removeFirst();
?? ??? ??? // 2 计数器递减
?? ??? ??? count.decrementAndGet();
?? ??? ??? // 3 唤醒另外一个线程
?? ??? ??? lock.notify();
?? ??? }
?? ??? return ret;
?? }
?? public int getSize() {
?? ??? return this.count.get();
?? }
?? public static void main(String[] args) {
?? ??? final MyQueue mq = new MyQueue(5);
?? ??? mq.put("a");
?? ??? mq.put("b");
?? ??? mq.put("c");
?? ??? mq.put("d");
?? ??? mq.put("e");
?? ??? System.out.println("当前容器的长度:" + mq.getSize());
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? mq.put("f");
?? ??? ??? ??? mq.put("g");
?? ??? ??? }
?? ??? }, "t1");
?? ??? t1.start();
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? Object o1 = mq.take();
?? ??? ??? ??? System.out.println("移除的元素为:" + o1);
?? ??? ??? ??? Object o2 = mq.take();
?? ??? ??? ??? System.out.println("移除的元素为:" + o2);
?? ??? ??? }
?? ??? }, "t2");
?? ??? try {
?? ??? ??? TimeUnit.SECONDS.sleep(2);
?? ??? } catch (InterruptedException e) {
?? ??? ??? e.printStackTrace();
?? ??? }
?? ??? t2.start();
?? }
}
java中锁的有:1、对象锁(当前对象/任意新建对象),2、类锁(XXX.class),3、字符串常量锁。另外加锁的粒度要尽可能的小,这样有利于提高程序的性能。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列通常应用于生产者和消费者场景,生产者即向队列里添加元素的线程,消费者即从队列里获取元素的线程。阻塞队列即生产者存放元素、消费者获取并消费元素的容器。阻塞队列通常提供四种处理方法,如下图所示:
从图中可知在第一列中,如果当前队列已满,并且继续调用add方法向队列中添加元素,那么将抛出IllegalStateException(“Queue full”)异常;如果当前队列为空,并且继续调用remove方法,那么将抛出NoSuchElementException异常;如果当前队列为空,并且继续调用element方法,那么将抛出NoSuchElementException异常。在第二列中,在使用offer方法时,如果成功返回true,否则返回false;使用poll方法时,有返回获取的元素,否则返回null。在第三列中,当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
java中常见的阻塞队列有:
ThreadLocal是线程的局部变量,是一种多线程间并发访问变量的解决方案。与synchronized等加锁的方式不同,ThreadLocal完全不提供锁,而使用以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全
从性能上说,ThreadLocal不具有绝对的优势,在并发不是很高的情况下,加锁的性能会更好,但作为一套与锁无关的线程安全解决方案,在高并发量或者竞争激烈的场景中,使用ThreadLocal可以在一定程度上减少锁竞争。可以参考示例ConnThreadLocal.java,具体代码如下所示:
public class ConnThreadLocal {
?? public static ThreadLocal<String> th = new ThreadLocal<String>();
?? public void setTh(String value) {
?? ??? th.set(value);
?? }
?? public void getTh() {
?? ??? System.out.println(Thread.currentThread().getName() + ":" + this.th.get());
?? }
?? public static void main(String[] args) throws InterruptedException {
?? ??? final ConnThreadLocal ct = new ConnThreadLocal();
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? ct.setTh("张三");
?? ??? ??? ??? ct.getTh();
?? ??? ??? }
?? ??? }, "t1");
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? try {
?? ??? ??? ??? ??? Thread.sleep(1000);
?? ??? ??? ??? ??? ct.getTh();
?? ??? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? ??? e.printStackTrace();
?? ??? ??? ??? }
?? ??? ??? }
?? ??? }, "t2");
?? ??? t1.start();
?? ??? t2.start();
?? }
}
单例模式,最常见的就是饥饿模式和懒汉模式,一个直接实例化对象,一个在调用方法时进行实例化对象,在多线程模式下,考虑到性能和线程安全,我们一般选择下面两种比较经典的单例模式:dubble check instance和static inner class,在性能提高的同时,又保证了线程安全,参见DubbleSingletion,具体代码如下所示:
public class DubbleSingleton {
?? private static DubbleSingleton ds;
?? public static DubbleSingleton getDs() {
?? ??? if (ds == null) {
?? ??? ??? try {
?? ??? ??? ??? // 模拟初始化对象的准备时间...
?? ??? ??? ??? Thread.sleep(3000);
?? ??? ??? } catch (InterruptedException e) {
?? ??? ??? ??? e.printStackTrace();
?? ??? ??? }
?? ??? ??? synchronized (DubbleSingleton.class) {
?? ??? ??? ??? if (ds == null) {
?? ??? ??? ??? ??? ds = new DubbleSingleton();
?? ??? ??? ??? }
?? ??? ??? }
?? ??? }
?? ??? return ds;
?? }
?? public static void main(String[] args) {
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());
?? ??? ??? }
?? ??? }, "t1");
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());
?? ??? ??? }
?? ??? }, "t2");
?? ??? Thread t3 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(DubbleSingleton.getDs().hashCode());
?? ??? ??? }
?? ??? }, "t3");
?? ??? t1.start();
?? ??? t2.start();
?? ??? t3.start();
?? }
}
Singlectio,具体代码如下所示:
public class Singletion {
?? private static class InnerSingletion {
?? ??? private static Singletion single = new Singletion();
?? }
?? public static Singletion getInstance() {
?? ??? return InnerSingletion.single;
?? }
?? public static void main(String[] args) {
?? ??? Thread t1 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());
?? ??? ??? }
?? ??? }, "t1");
?? ??? Thread t2 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());
?? ??? ??? }
?? ??? }, "t2");
?? ??? Thread t3 = new Thread(new Runnable() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? System.out.println(Singletion.getInstance().hashCode());
?? ??? ??? }
?? ??? }, "t3");
?? ??? t1.start();
?? ??? t2.start();
?? ??? t3.start();
?? }
}
jdk中包含许多工具类,比如:同步类容器,并发类容器。接下来将介绍java中的几种同步类容器,不过在讲解之前首先介绍一些基本概念:
java中集合框架主要有四类:List、Set、Queue、Map。其中List、Set、Queue接口继承了Collection接口,而Map本身是一个接口。注意Collection和Map是一个顶层接口;List、Set、Queue则分别代表数组、集合和队列这三大类容器,日常开发中常用的集合像ArrayList、LinkedList都实现了List接口,HashSet实现了Set接口,Deque(又称双向队列,允许在队首/队尾进行入队/出队操作)继承了Queue接口,PriorityQueue实现了Queue接口。另外由于LinkedList是一个双向链表,所以其又实现了Deque接口。 这里说的这些集合基本上都是非线程安全的。如果多个线程并发的访问这些容器时,就会出现问题,其中最常见的问题是ConcurrentModificationException。所以开发人员在编写程序的时候,必须手动的在访问这些容器的地方进行同步处理,但是这样会使容器的使用变得非常麻烦,所以java为我们提供了一些同步容器类。java中的同步容器类主要包括两类:1)Vector、Stack、HashTable,2)Collections类中提供的静态工厂方法创建的类。第一种同步类容器中的Vector实现了List接口,所以其本质上是一个数组,和ArrayList类似,不过Vector中所有的方法都是synchronized方法,即所有方法都进行了同步措施。与Vector一样,Stack也是一个所有方法都进行了同步措施的同步类容器,它继承自Vector类。HashTable与HashMap类似,都实现了Map接口,但是前者中的所有方法都是synchronized方法,后者不是。第二种同步类容器Collections类是一个工具提供类,注意它和Collection的区别,Collection是一个顶层的接口,而它不是;另外就是Collections类中提供了大量的方法,它们可以对集合进行各种各样的操作,比如对集合或者容器进行排序、查找,最重要的是,它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:
虽然java为我们提供的同步类容器有很多优点,比如方便开发人员使用,但是也存在一些缺陷:同步类容器中大量使用synchronized,在一定程度上会影响程序的并发性能。注意:同步的主要作用就是使共享资源表现出正确的行为,另外同步类容器是线程安全的。多线程操作某个容器的时候,此时容器作为共享资源,需要加以保护,以使其表现出正确的行为——这里的意思是集合中存储的数据正确。对于非线程安全的集合,可能会出现错误现象,但是对于同步类容器,一般不会出现这种现象。
jdk1.5版以后增加了很多并发类容器,用以替代同步类容器,改善性能,这些容器大多位于concurrent包下。同步类容器的操作是串行化的,虽实现了线程安全,却降低了并发性,因此在多线程环境中,会严重降低应用程序的吞吐量。并发类容器是专门针对并发设计的,比如以使用ConcurrentHashMap来代替HashTable,而且ConcurrentHashMap中还添加了一些常见复合操作;可以使用CopyOnWriteArrayList代替Voctor,并发的CopyonWriteArraySet,以及并发的queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能的队列,后者是以阻塞形式的队列,具体实现Queue还有很多,例如ArrayBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。 接下来介绍java中常用的几个并发类容器:
我们都知道ArrayList是线程不安全的,但是为什么这么说呢?首先我们要明确线程安全的概念,线程安全就是共享资源在多个线程同时操作时始终表现出正确的结果,这里有两个关键的地方:1、共享资源,2、始终表现出正确的结果,3、多个线程同时操作。由此我们可以推断ArrayList非线程安全就是说当多个线程操作同一个ArrayList对象时,会出现不确定的结果,那么我们可以做一个测试,示例代码如下所示:
import java.util.ArrayList;
import java.util.List;
public class App {
?? // static Vector<String> list = new Vector<String>();
?? static List<String> list = new ArrayList<String>();
?? public static void main(String[] args) throws InterruptedException {
?? ??? Thread t1 = new Thread() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? int i = 0;
?? ??? ??? ??? while (true) {
?? ??? ??? ??? ??? list.add(String.valueOf(++i));
?? ??? ??? ??? ??? if (i == 10)
?? ??? ??? ??? ??? ??? break;
?? ??? ??? ??? }
?? ??? ??? }
?? ??? };
?? ??? Thread t2 = new Thread() {
?? ??? ??? @Override
?? ??? ??? public void run() {
?? ??? ??? ??? int i = 110;
?? ??? ??? ??? while (true) {
?? ??? ??? ??? ??? list.add(String.valueOf(++i));
?? ??? ??? ??? ??? if (i == 115)
?? ??? ??? ??? ??? ??? break;
?? ??? ??? ??? }
?? ??? ??? }
?? ??? };
?? t1.start();
?? ??? t2.start();
?? ??? Thread.sleep(1000 * 5);
?? ??? for (int i = 0; i < list.size(); i++) {
?? ??? ??? System.out.println(list.get(i));
?? ??? }
?? }
}
通过这个代码我们想要的结果是——数组长度:15,其中的值:1、2、3、4、5、6、7、8、9、10、111、112、113、114、115。但是多次(不超过5次,可能机器不同,这个结果也不同)执行发行了异常行为——数组长度:14,其中的值为:2、3、4、5、6、7、8、9、10、111、112、113、114、115。这就说明了ArrayList是非线程安全的。将ArrayList换成线程安全的Vector,多次(超过10次)执行发现结果始终是我们期待的结果,所以Vector是线程安全的。ArrayList并发执行出现异常的原因:添加元素需要两步完成(在指定位置存放元素,增大数组长度),这些操作组合在一起是非原子的。有些人说执行的时候会报ConcurrentModificationException,这个就说明了ArrayList是非线程安全的,个人觉得这种说法错误的,因为在这里我们需要证明的是ArrayList作为共享资源时的同步安全性,而非ArrayList的并发访问性,抛出该异常恰恰说明了ArrayList不支持并发写的同时,又并发地访问、修改或删除。同步类容器抛出这个异常也是这个原因,但是这不能说明同步类容器是不安全的。
Future模式有点类似于商品订单。比如再网购时,当看重某一件商品时,就可以提交订单,当订单处理完成后,在家里等待商品送货上门就可以了。或者说更像我们发送ajax请求的时候,页面是异步的进行后台处理,用户无需一直等待请求的结果,可以继续浏览或操作其他内容
为了帮助开发人员有效地控制多线程,并且提高多线程开发效率,jdk提供了一套线程框架Executor。其是位于java.util.concurrent包中,jdk并发包的核心。该框架中最重要的类是:Executors,它扮演着线程工厂的角色,通过它可以创建具有特定功能的线程池。Executors类中创建线程的方法有:
Executors类中创建线程池的方法内部均使用了ThreadPoolExecutor这个类。如果Executors工厂类无法满足工作需求,开发人员可以自己创建线程池。自定义线程池时需要继承ThreadPoolExecutor。该类构造方法的代码为:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ........... }
该构造函数的参数分别为:
注意的事项:1、ThreadPoolExecutor构造函数中的参数workQueue对于队列类型比较敏感:1)、如果传入的参数是有界队列,当有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,那么会将任务添加到传入参数所代表的队列中;如果传入参数代表的队列已满,那么在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略或自定义方式;2)、如果传入的参数是无界队列,那么除非系统资源耗尽,否则不存在任务入队失败的情况。也就是说当有新的任务到来,且系统线程数小于corePoolSize时,就会创建新的线程执行任务;当系统线程数达到corePoolSize后,就不会新建线程了。如果后续依然有新的任务加入,而又没有空闲的线程资源,那么任务会直接进入队列等待执行。如果任务创建和处理的速度差异很大,那么无界队列会保持快速增长,直到系统内存耗尽。2、ThreadPoolExecutor构造函数中的最后一个参数handler表示线程池拒绝策略,其可取值为:1)、AbortPolicy——直接抛出异常,阻止系统工作;2)、CallerRunsPolicy——只要线程池没有关闭,该策略就会直接在调用者线程中运行当前被丢弃的任务;3)、DiscardOldestPolicy——丢弃最老的一个请求,尝试再次提交当前任务;4)、DiscardPolicy——丢弃无法处理的任务,不给予任务处理;4)、自定义拒绝策略——通过实现RejectedExecutionHandler接口实现。
在jdk的concurrent.util包中有几个比较常用的工具类,它们使用简单,方便开发。接下来我们将介绍几个常见的工具类:CyclicBarrier、CountDownLacth、Callable和Future、Samaphore信号量。
import java.io.IOException;?
import java.util.Random;?
import java.util.concurrent.BrokenBarrierException;?
import java.util.concurrent.CyclicBarrier;?
import java.util.concurrent.ExecutorService;?
import java.util.concurrent.Executors;
public class UseCyclicBarrier {
???????? static class Runner implements Runnable {?
???????? ??? private CyclicBarrier barrier;?
???????? ??? private String name;??
???????? ??? public Runner(CyclicBarrier barrier, String name) {?
???????? ??????? this.barrier = barrier;?
???????? ? ??????this.name = name;?
???????? ??? }?
???????? ??? @Override?
???????? ??? public void run() {?
???????? ??????? try {?
???????? ??????????? Thread.sleep(1000 * (new Random()).nextInt(5));?
???????? ??????????? System.out.println(name + " 准备OK.");?
???????? ??????????? barrier.await();?
???????? ??????? } catch (InterruptedException e) {?
???????? ??????????? e.printStackTrace();?
???????? ??????? } catch (BrokenBarrierException e) {?
???????? ??????????? e.printStackTrace();?
???????? ??????? }?
???????? ??????? System.out.println(name + " Go!!");?
???????? ??? }?
???????? }
??? public static void main(String[] args) throws IOException, InterruptedException {?
??????? CyclicBarrier barrier = new CyclicBarrier(3);? // 3
??????? ExecutorService executor = Executors.newFixedThreadPool(3);?
??????? executor.submit(new Thread(new Runner(barrier, "zhangsan")));?
??????? executor.submit(new Thread(new Runner(barrier, "lisi")));?
??????? executor.submit(new Thread(new Runner(barrier, "wangwu")));?
??????? executor.shutdown();?
??? }
}
import java.util.concurrent.CountDownLatch;
public class UseCountDownLatch {
???????? public static void main(String[] args) {
???????? ???????? final CountDownLatch countDown = new CountDownLatch(2);
???????? ???????? Thread t1 = new Thread(new Runnable() {
???????? ???????? ???????? @Override
???????? ???????? ???????? public void run() {
???????? ???????? ???????? ???????? try {
???????? ???????? ???????? ???????? ???????? System.out.println("进入线程t1" + "等待其他线程处理完成...");
???????? ???????? ???????? ???????? ???????? countDown.await();
???????? ???????? ???????? ???????? ???????? System.out.println("t1线程继续执行...");
???????? ???????? ???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? ???????? ???????? e.printStackTrace();
???????? ???????? ???????? ???????? }
???????? ???????? ???????? }
???????? ???????? },"t1");
???????? ???????? Thread t2 = new Thread(new Runnable() {
???????? ???????? ???????? @Override
???????? ???????? ???????? public void run() {
???????? ???????? ???????? ???????? try {
???????? ???????? ???????? ???????? ???????? System.out.println("t2线程进行初始化操作...");
???????? ???????? ???????? ???????? ???????? Thread.sleep(3000);
???????? ???????? ???????? ???????? ???????? System.out.println("t2线程初始化完毕,通知t1线程继续...");
???????? ???????? ???????? ???????? ???????? countDown.countDown();
???????? ???????? ???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? ???????? ???????? e.printStackTrace();
???????? ???????? ???????? ???????? }
???????? ???????? ???????? }
???????? ???????? });
???????? ???????? Thread t3 = new Thread(new Runnable() {
???????? ???????? ???????? @Override
???????? ???????? ???????? public void run() {
???????? ???????? ???????? ???????? try {
???????? ???????? ???????? ???????? ???????? System.out.println("t3线程进行初始化操作...");
???????? ???????? ???????? ???????? ???????? Thread.sleep(4000);
???????? ???????? ???????? ???????? ???????? System.out.println("t3线程初始化完毕,通知t1线程继续...");
???????? ???????? ???????? ???????? ???????? countDown.countDown();
???????? ???????? ???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? ???????? ???????? e.printStackTrace();
???????? ???????? ???????? ???????? }
???????? ???????? ???????? }
???????? ???????? });
???????? ???????? t1.start();
???????? ???????? t2.start();
???????? ???????? t3.start(); ????????
???????? }
}
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class UseFuture implements Callable<String>{
???????? private String para;
???????? public UseFuture(String para){
???????? ???????? this.para = para;
???????? }
???????? /**
???????? ?* 这里是真实的业务逻辑,其执行可能很慢
???????? ?*/
???????? @Override
???????? public String call() throws Exception {
???????? ???????? //模拟执行耗时
???????? ???????? Thread.sleep(5000);
???????? ???????? String result = this.para + "处理完成";
???????? ???????? return result;
???????? }
???????? //主控制函数
???????? public static void main(String[] args) throws Exception {
???????? ???????? String queryStr = "query";
???????? ???????? //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
???????? ???????? FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
???????? ???????? FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
???????? ???????? //创建一个固定线程的线程池且线程数为1,
???????? ???????? ExecutorService executor = Executors.newFixedThreadPool(2);
???????? ???????? //这里提交任务future,则开启线程执行RealData的call()方法执行
???????? ???????? //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值
???????? ???????? Future f1 = executor.submit(future);???? ???????? //单独启动一个线程去执行的
???????? ???????? Future f2 = executor.submit(future2);
???????? ???????? System.out.println("请求完毕");
???????? ???????? try {
???????? ???????? ???????? //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
???????? ???????? ???????? System.out.println("处理实际的业务逻辑...");
???????? ???????? ???????? Thread.sleep(1000);
???????? ???????? } catch (Exception e) {
???????? ???????? ???????? e.printStackTrace();
???????? ???????? }
???????? ???????? //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
???????? ???????? System.out.println("数据:" + future.get());
???????? ???????? System.out.println("数据:" + future2.get());
???????? ???????? executor.shutdown();
???????? }
}
public class SemaphoreTest {?
??? private static final int NUMBER = 5;??? //限制资源访问数?
??? private static final Semaphore avialable = new Semaphore(NUMBER,true);?
??? public static void main(String[] args) {?
??????? ExecutorService pool = Executors.newCachedThreadPool();?
??????? Runnable r = new Runnable(){?
??????????? public void run(){?
??????????????? try {?
??????????????????? avialable.acquire();??? //此方法阻塞?
??????????????????? Thread.sleep(10*1000);?
??????????????????? System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完毕");?
??????????????????? avialable.release();?
?????????? ?????} catch (InterruptedException e) {?
??????????????????? e.printStackTrace();?
??????????????? }?
??????????? }?
??????? };?
??????? System.out.println(avialable.availablePermits());?
??????? for(int i=0;i<10;i++){?
??????????? pool.execute(r); ?
??????? }?
??????? System.out.println(avialable.availablePermits());?
??????? pool.shutdown();?
??? }??
??? public static String getNow(){?
??????? SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");?
??????? return sdf.format(new Date());?
??? }?
}?
相关概念:
当然评判一个系统的指标还有诸如:CPU、内存、网络、磁盘等细节问题。有时也会涉及到数据库层面的设计,比如:select、update、delete/ps
网络资源:JDK 6中的java.util.concurrent包下实用工具类 | 学步园
另外需要注意的是:CyclicBarrier和CountDownLacth之间的区别是前者在等待多个线程准备好后,多个线程同时执行;后者则是在等待多个线程准备好后,让被通知的线程执行,这个线程大多时候指的是主线程
在java多线程中,可以用synchronized关键字来实现线程间“同步互斥”的工作,以达到保护共享资源的目的。当然,java也提供了另外一个更优秀的机制来完成这个工作,即Lock对象,这里我们主要讲解两种锁:重入锁/读、写锁。他们的功能比synchronized强大,比如:嗅探锁定、多路分支等功能。
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class UseReentrantLock {
???????? private Lock lock = new ReentrantLock();
???????? public void method1(){
???????? ???????? try {
???????? ???????? ???????? lock.lock();
???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method1..");
???????? ???????? ???????? Thread.sleep(1000);
???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method1..");
???????? ???????? ???????? Thread.sleep(1000);
???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? e.printStackTrace();
???????? ???????? } finally {
???????? ???????? ???????? lock.unlock();
???????? ???????? }
???????? }
???????? public void method2(){
???????? ???????? try {
???????? ???????? ???????? lock.lock();
???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "进入method2..");
???????? ???????? ???????? Thread.sleep(2000);
???????? ???????? ???????? System.out.println("当前线程:" + Thread.currentThread().getName() + "退出method2..");
???????? ???????? ???????? Thread.sleep(1000);
???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? e.printStackTrace();
???????? ???????? } finally {
???????? ???????? ???????? lock.unlock();
???????? ???????? }
???????? }
???????? public static void main(String[] args) {
???????? ???????? final UseReentrantLock ur = new UseReentrantLock();
???????? ???????? Thread t1 = new Thread(new Runnable() {
???????? ???????? ???????? @Override
???????? ???????? ???????? public void run() {
???????? ???????? ???????? ???????? ur.method1();
???????? ???????? ???????? ???????? ur.method2();
???????? ???????? ???????? }
???????? ???????? }, "t1");
???????? ???????? t1.start();
???????? ???????? try {
???????? ???????? ???????? Thread.sleep(10);
???????? ???????? } catch (InterruptedException e) {
???????? ???????? ???????? e.printStackTrace();
???????? ???????? }
???????? ???????? //System.out.println(ur.lock.getQueueLength());
???????? }??????
}
关于锁的问题可以参考资源:https://www.cnblogs.com/qifengshi/p/6831055.html