池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在 Java 中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这就是”池化资源”技术产生的原因。
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。Java 5+中的 Executor 接口定义一个执行线程的工具。它的子类型即线程池接口是 ExecutorService。要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,因此在工具类Executors 面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:
(1)newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
(2)newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。如果希望在服务器上使用线程池,建议使用 newFixedThreadPool方法来创建线程池,这样能获得更好的性能。
(3) newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60 秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的最大线程大小。
(4)newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。
每次执行任务创建线程 new Thread()比较消耗性能,创建一个线程是比较耗时、耗资源的,而且无限制的创建线程会引起应用程序内存溢出。
所以创建一个线程池是个更好的的解决方案,因为可以限制线程的数量并且可以回收再利用这些线程。利用Executors 框架可以非常方便的创建一个线程池。
接收参数:execute()只能执行 Runnable 类型的任务。submit()可以执行Runnable 和 Callable 类型的任务。
返回值:submit()方法可以返回持有计算结果的 Future 对象,而execute()没有异常处理:submit()方便Exception处理
ThreadGroup 类,可以把线程归属到某一个线程组中,线程组中可以有线程对象,也可以有线程组,组中还可以有线程,这样的组织结构有点类似于树的形式。
线程组和线程池是两个不同的概念,他们的作用完全不同,前者是为了方便线程的管理,后者是为了管理线程的生命周期,复用线程,减少创建销毁线程的开销。
为什么不推荐使用线程组?因为使用有很多的安全隐患吧,没有具体追究,如果需要使用,推荐使用线程池。
《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 各个方法的弊端:
创建线程池的方式有多种,这里你只需要答 ThreadPoolExecutor 即可。ThreadPoolExecutor() 是最原始的线程池创建,也是阿里巴巴 Java 开发手册中明确规范的创建线程池的方式。
ThreadPoolExecutor3 个最重要的参数:
ThreadPoolExecutor饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:
Runnable
+ThreadPoolExecutor
线程池实现原理
为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池Demo。
首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)
1 import java.util.Date;
2
3 /**
4 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
5 */
6 public class MyRunnable implements Runnable {
7
8 private String command;
9
10 public MyRunnable(String s) {
11 this.command = s;
12 }
13
14 @Override
15 public void run() {
16 System.out.println(Thread.currentThread().getName() + " Start. Time = "
+ new Date());
17 processCommand();
18 System.out.println(Thread.currentThread().getName() + " End. Time = " +
new Date());
19 }
20
21 private void processCommand() {
22 try {
23 Thread.sleep(5000);
24 } catch (InterruptedException e) {
25 e.printStackTrace();
26 }
27 }
28
29 @Override
30 public String toString() {
31 return this.command;
32 }
33 }
编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。
1 import java.util.concurrent.ArrayBlockingQueue;
2 import java.util.concurrent.ThreadPoolExecutor;
3 import java.util.concurrent.TimeUnit;
4
5 public class ThreadPoolExecutorDemo {
6
7 private static final int CORE_POOL_SIZE = 5;
8 private static final int MAX_POOL_SIZE = 10;
9 private static final int QUEUE_CAPACITY = 100;
10 private static final Long KEEP_ALIVE_TIME = 1L;
11 public static void main(String[] args) {
12
13 //使用阿里巴巴推荐的创建线程池的方式
14 //通过ThreadPoolExecutor构造函数自定义参数创建
15 ThreadPoolExecutor executor = new ThreadPoolExecutor(
16 CORE_POOL_SIZE,
17 MAX_POOL_SIZE,
18 KEEP_ALIVE_TIME,
19 TimeUnit.SECONDS,
20 new ArrayBlockingQueue<>(QUEUE_CAPACITY),
21 new ThreadPoolExecutor.CallerRunsPolicy());
22
23 for (int i = 0; i < 10; i++) {
24 //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
25 Runnable worker = new MyRunnable("" + i);
26 //执行Runnable
27 executor.execute(worker);
28 }
29 //终止线程池
30 executor.shutdown();
31 while (!executor.isTerminated()) {
32 }
33 System.out.println("Finished all threads");
34 }
35 }
可以看到我们上面的代码指定了:
Output:
1 pool‐1‐thread‐2 Start. Time = Tue Nov 12 20:59:44 CST 2019
2 pool‐1‐thread‐5 Start. Time = Tue Nov 12 20:59:44 CST 2019
3 pool‐1‐thread‐4 Start. Time = Tue Nov 12 20:59:44 CST 2019
4 pool‐1‐thread‐1 Start. Time = Tue Nov 12 20:59:44 CST 2019
5 pool‐1‐thread‐3 Start. Time = Tue Nov 12 20:59:44 CST 2019
6 pool‐1‐thread‐5 End. Time = Tue Nov 12 20:59:49 CST 2019
7 pool‐1‐thread‐3 End. Time = Tue Nov 12 20:59:49 CST 2019
8 pool‐1‐thread‐2 End. Time = Tue Nov 12 20:59:49 CST 2019
9 pool‐1‐thread‐4 End. Time = Tue Nov 12 20:59:49 CST 2019
10 pool‐1‐thread‐1 End. Time = Tue Nov 12 20:59:49 CST 2019
11 pool‐1‐thread‐2 Start. Time = Tue Nov 12 20:59:49 CST 2019
12 pool‐1‐thread‐1 Start. Time = Tue Nov 12 20:59:49 CST 2019
13 pool‐1‐thread‐4 Start. Time = Tue Nov 12 20:59:49 CST 2019
14 pool‐1‐thread‐3 Start. Time = Tue Nov 12 20:59:49 CST 2019
15 pool‐1‐thread‐5 Start. Time = Tue Nov 12 20:59:49 CST 2019
16 pool‐1‐thread‐2 End. Time = Tue Nov 12 20:59:54 CST 2019
17 pool‐1‐thread‐3 End. Time = Tue Nov 12 20:59:54 CST 2019
18 pool‐1‐thread‐4 End. Time = Tue Nov 12 20:59:54 CST 2019
19 pool‐1‐thread‐5 End. Time = Tue Nov 12 20:59:54 CST 2019
20 pool‐1‐thread‐1 End. Time = Tue Nov 12 20:59:54 CST 2019
原子操作(atomic operation)意为”不可被中断的一个或一系列操作” 。处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。在 Java 中可以通过锁和循环 CAS 的方式来实现原子操作。 CAS 操作——Compare & Set,或是 Compare & Swap,现在几乎所有的 CPU 指令都支持CAS 的原子操作。
原子操作是指一个不受其他操作影响的操作任务单元。原子操作是在多线程环境下避免数据不一致必须的手段。
int++并不是一个原子操作,所以当一个线程读取它的值并加 1 时,另外一个线程有可能会读到之前的值,这就会引发错误。
为了解决这个问题,必须保证增加操作是原子的,在 JDK1.5 之前我们可以使用同步技术来做到这一点。到 JDK1.5,java.util.concurrent.atomic 包提供了 int和long 类型的原子包装类,它们可以自动的保证对于他们的操作是原子的并且不需要使用同步。
java.util.concurrent 这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由 JVM 从等待队列中选择另一个线程进入,这只是一种逻辑上的理解。
原子类:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
原子数组:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
原子属性更新器:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
解决 ABA 问题的原子类:AtomicMarkableReference(通过引入一个boolean来反映中间有没有变过),AtomicStampedReference(通过引入一个 int 来累加来反映中间有没有变过)
Atomic包中的类基本的特性就是在多线程环境下,当有多个线程同时对单个(包括基本类型及引用类型)变量进行操作时,具有排他性,即当多个线程同时对该变量的值进行更新时,仅有一个线程能成功,而未成功的线程可以向自旋锁一样,继续尝试,一直等到执行成功。
AtomicInteger 类的部分源码:
1 // setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并
替换”的作用)
2 private static final Unsafe unsafe = Unsafe.getUnsafe();
3 private static final long valueOffset;
4
5 static {
6 try {
7 valueOffset = unsafe.objectFieldOffset
8 (AtomicInteger.class.getDeclaredField("value"));
9 } catch (Exception ex) { throw new Error(ex); }
10 }
11
12 private volatile int value;
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址,返回值是 valueOffset。另外 value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
CountDownLatch与CyclicBarrier都是用于控制并发的工具类,都可以理解成维护的就是一个计数器,但是这两者还是各有不同侧重点的:
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。
调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线
程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;
CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法
获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;
CountDownLatch是不能复用的,而CyclicLatch是可以复用的。
Semaphore 就是一个信号量,它的作用是限制某段代码块的并发数。
Semaphore有一个构造函数,可以传入一个 int 型整数 n,表示某段代码最多只有 n 个线程可以访问,如果超出了 n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。由此可以看出如果 Semaphore 构造函数中传入的 int 型整数 n=1,相当于变成了一个 synchronized 了。
Semaphore(信号量)-允许多个线程同时访问: synchronized 和ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
Exchanger是一个用于线程间协作的工具类,用于两个线程间交换数据。它提供了一个交换的同步点,在这个同步点两个线程能够交换数据。交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。
Semaphore(信号量)-允许多个线程同时访问: synchronized 和
ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。
CountDownLatch(倒计时器): CountDownLatch是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
CyclicBarrier(循环栅栏): CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有
被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(intparties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。