本文使用测试类的形式介绍Java中多线程的使用示例。
在主线程的基础上额外增加了一个线程t,线程t与主线程独立运行,可能先输出线程t的消息也可能先输出主线程的消息。
public void testCreateThread() {
Thread t = new Thread(() -> log.debug("running"));
t.start();
log.info("main thread running");
}
2023-12-11 11:24:34 [Thread-0] c.DiyThreadTest - running
2023-12-11 11:24:34 [main] c.DiyThreadTest - main thread running
使用Callable接口的方式实现的线程,可以获取线程的返回信息。
public void testFuture() throws Exception {
FutureTask<Integer> task = new FutureTask<>(() -> {
log.info("running...");
Thread.sleep(100);
});
Thread t1 = new Thread(task, "t1");
t1.start();
//get成功获取后立即返回,失败则会等待
log.debug("{}", task.get());
}
2023-12-11 11:29:15 [t1] c.DiyThreadTest - running…
2023-12-11 11:29:16 [main] c.DiyThreadTest - 100
主线程阻塞一秒后打断发生死循环的线程t1。
public void testInterrupt() throws InterruptedException {
Thread t1 = new Thread(() -> {
while(true) {
boolean interrupted = Thread.currentThread().isInterrupted();
if (interrupted) {
log.debug("被打断了,退出循环");
break;
}
}
});
t1.start();
Thread.sleep(1000);
log.debug("interrupt");
t1.interrupt();
}
2023-12-11 14:07:51 [main] c.DiyThreadTest - interrupt
2023-12-11 14:07:51 [Thread-0] c.DiyThreadTest - 被打断了,退出循环
线程t1创建后没有执行,此时是NEW状态;线程t2处于可以运行的状态即RUNNABLE,线程t3已经执行完毕,此时是TERMINATED状态;线程t4等待指定时间后可以执行,此时是TIMED_WAITING状态;线程t5阻塞等待线程t2执行完毕,此时是WAITING状态;线程t6与线程t4竞争同一把锁,此时处于BLOCK状态。
public void testThreadState() {
Thread t1 = new Thread("t1") {
@Override
public void run() {
log.debug("running...");
}
};
Thread t2 = new Thread("t2") {
@Override
public void run() {
while(true) {}
}
};
t2.start();
Thread t3 = new Thread("t3") {
@Override
public void run() {
log.debug("running...");
}
};
t3.start();
Thread t4 = new Thread("t4") {
@Override
public void run() {
synchronized(DiyThreadTest.class) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t4.start();
Thread t5 = new Thread("t5") {
@Override
public void run() {
try {
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t5.start();
Thread t6 = new Thread("t6") {
@Override
public void run() {
synchronized(DiyThreadTest.class) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t6.start();
log.debug("t1 state {}", t1.getState());
log.debug("t2 state {}", t2.getState());
log.debug("t3 state {}", t3.getState());
log.debug("t4 state {}", t4.getState());
log.debug("t5 state {}", t5.getState());
log.debug("t6 state {}", t6.getState());
}
2023-12-11 21:40:25 [main] c.DiyThreadTest - t1 state NEW
2023-12-11 21:40:25 [t3] c.DiyThreadTest - running…
2023-12-11 21:40:25 [main] c.DiyThreadTest - t2 state RUNNABLE
2023-12-11 21:40:25 [main] c.DiyThreadTest - t3 state TERMINATED
2023-12-11 21:40:25 [main] c.DiyThreadTest - t4 state TIMED_WAITING
2023-12-11 21:40:25 [main] c.DiyThreadTest - t5 state WAITING
2023-12-11 21:40:25 [main] c.DiyThreadTest - t6 state BLOCKED
使用park/unpark方法实现线程同步。
public void testPark() throws InterruptedException {
Thread t1 = new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
Thread.sleep(2000);
log.debug("unPark...");
LockSupport.unpark();
}
2023-12-11 22:19:51 [t1] c.DiyThreadTest - start…
2023-12-11 22:19:52 [t1] c.DiyThreadTest - park…
2023-12-11 22:19:53 [main] c.DiyThreadTest - unPark…
2023-12-11 22:19:53 [t1] c.DiyThreadTest - resume…
账户a和账户b初始金额1000,账户类提供了转账方法,入参是转账目标账户以及转账的金额。转账时从当前账户扣除转账金额,同时给目标账户增加相应的金额。
public void testAccount() throws InterruptedException {
Account a = new Account(1000);
Account b = new Account(1000);
Thread t1 = new Thread(()->{
for (int i=0;i<1000;i++){
a.transfer(b, randomInt());
}
}, "t1");
Thread t2 = new Thread(()->{
for (int i=0;i<1000;i++){
b.transfer(a, randomInt());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("total: {}", a.getMoney() + b.getMoney());
}
//随机获取0-100之间的一个整数
private static Random random = new Random();
private static int randomInt() {
return random.nextInt(100) + 1;
}
private static class Account {
private int money;
Account(int money) {
this.money = money;
}
int getMoney() {
return money;
}
void setMoney(int money) {
this.money = money;
}
void transfer(Account target, int amount) {
synchronized(Account.class) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
}
2023-12-12 23:10:09 [main] c.DiyThreadTest - total: 2000
对Room类中的计数器的操作方法都加上了synchronized对象锁,线程t1在自增的同时,线程t2获取不到锁从而阻塞等待。
public void testSynchronized() throws InterruptedException {
//互斥逻辑封装在Room类内部
Room room = new Room();
Thread t1 = new Thread(()->{
for(int i=0;i<5000;i++) {
room.increment();
}
}, "t1");
Thread t2 = new Thread(()->{
for(int i=0;i<5000;i++) {
room.decrement();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", room.getCounter());
}
private static class Room {
private int counter = 0;
void increment() {
synchronized(this) {
counter++;
}
}
void decrement() {
synchronized(this) {
counter--;
}
}
int getCounter() {
synchronized(this) {
return counter;
}
}
}
2023-12-12 23:30:37 [main] c.DiyThreadTest - 0
两阶段终止设计模式用于管理线程或进程的生命周期,包括准备阶段和停止阶段。准备阶段用于准备线程或进程的停止,停止阶段执行实际的停止操作,该模式目的是确保线程或进程在停止之前完成必要的清理和释放资源操作。
public void testTwoPhaseTermination() throws InterruptedException {
TwoPhaseTermination tpt = new TwoPhaseTermination();
tpt.start();
Thread.sleep(1500);
tpt.stop();
}
private static class TwoPhaseTermination {
private Thread monitor;
void start() {
monitor = new Thread(()->{
while(true) {
Thread thread = Thread.currentThread();
if (current.isInterrupted()) {
log.debug("处理后续");
break;
}
try {
Thread.sleep(1000);
log.debug("执行监控记录");
} catch (InterruptedException e) {
current.interrupt(); //重新设置打断标志
}
}
});
monitor.start();
}
void stop() {
monitor.interrupt();
}
}
2023-12-12 23:44:03 [Thread-0] c.DiyThreadTest - 执行监控记录
2023-12-12 23:44:04 [Thread-0] c.DiyThreadTest - 处理后续
犹豫模式:当一个线程发现其他线程或本线程已经做了某件事,那么本线程就不需要再做了,直接返回。比如监控线程只需要创建一个就行了。
public void testTwoPeriodTermination() throws InterruptedException {
TwoPeriodTermination termination = new TwoPeriodTermination();
termination.start();
Thread.sleep(2000);
log.info("监控终止...");
termination.stop();
}
private static class TwoPeriodTermination {
private Thread monitor;
private volatile boolean stop = false;
//是否执行过start线程
private boolean starting = false;
void start() {
synchronized(this) {
if (starting) {
return;
}
starting = true;
}
monitor = new Thread(()->{
while(true) {
if (stop) {
log.info("处理后续事项");
break;
}
try {
Thread.sleep(1000);
log.info("执行监控记录");
} catch (InterruptedException e) {
}
}
});
monitor.start();
}
void stop() {
stop = true;
monitor.interrupt();
}
}
2023-12-14 22:38:01 [Thread-0] c.DiyThreadTest - 执行监控记录
2023-12-14 22:38:02 [main] c.DiyThreadTest - 监控停止…
2023-12-14 22:38:02 [Thread-0] c.DiyThreadTest - 处理后续事项
public void testAtomicInteger() {
AtomicInteger i = new AtomicInteger(0);
//先自增后获取
log.info(i.incrementAndGet() + "");
//先获取后自增
log.info(i.getAndIncrement() + "");
//先更新后获取
log.info(i.updateAndGet(value -> value * 10) + "");
}
2023-12-14 22:56:02 [main] c.DiyThreadTest - 1
2023-12-14 22:56:02 [main] c.DiyThreadTest - 1
2023-12-14 22:56:02 [main] c.DiyThreadTest - 20
public void testAtomicIntegerArray() {
//多线程下非原子操作线程不安全
atomicIntArray(()->new int[10], (array)->array.length,
(array, index)->array[index]++, array->log.info(Arrays.toString(array)));
atomicIntArray(()->new AtomicIntegerArray(10), AtomicIntegerArray::length,
AtomicIntegerArray::getAndIncrement, array->log.info(array + ""));
}
private static <T> void atomicIntArray(Supplier<T> supplier,
Function<T, Integer> fun,
BiConsumer<T, Integer> biConsumer,
Consumer<T> consumer) {
//supplier 提供者
//function 函数 一个参数一个结果
//consumer 消费者 一个参数无结果
List<Thread> threads = new ArrayList<>();
T array = supplier.get();
int length = fun.apply(array);
for (int i=0;i<length;i++) {
threads.add(new Thread(()->{
for (int j=0;j<10000;j++) {
biConsumer.accept(array, j%length);
}
}));
}
threads.forEach(Thread::start);
for (Thread thread:threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.accept(array);
}
2023-12-14 23:11:55 [main] c.DiyThreadTest - [9647, 9652, 9639, 9624, 9635, 9639, 9642, 9634, 9632, 9637]
2023-12-14 23:11:55 [main] c.DiyThreadTest - [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
public void testAtomicRefFieldUpdater() {
Student stu = new Student();
AtomicReferenceFieldUpdater<Student, String> updater =
AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
updater.compareAndSet(stu, null, "张三");
log.info(stu.toString());
}
private static class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
2023-12-14 23:17:42 [main] c.DiyThreadTest - Student{name=‘张三’}
public void testAccumulate(){
accumulate(()->new AtomicLong(0), AtomicLong::getAndIncrement);
//LongAdder在有竞争时,设置多个累加单元,Thread-0累加Cell[0],而Thread-1累加Cell[1]...
//最后把结果汇总,这样在累加操作时操作的时不同的cell变量,因此减少了cas重试失败,因此性能更高
//Cell是数组形式,在内存中是连续存储的,一个Cell有24字节(16字节的对象头和8字节的值),因此缓存行可以存放2个Cell对象
//@sun.misc.Contended注解原理是在使用注解的对象或字段的前后各增加128字节大小的填充,从而让CPU将对象预读到缓存时占用不同的缓存行
accumulate(LongAdder::new, LongAdder::increment);
}
private static <T> void accumulate(Supplier<T> supplier, Consumer<T> consumer){
T accumulator = supplier.get();
List<Thread> threads = new ArrayList<>();
for(int i=0;i<4;i++){
threads.add(new Thread(()->{
for(int j=0;j<50000;j++){
consumer.accept(accumulator);
}
}));
}
long start = System.nanoTime();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
log.info(accumulator + " cost:" + (end - start)/1000_000);
}
2023-12-14 23:19:49 [main] c.DiyThreadTest - 200000 cost:11
2023-12-14 23:19:49 [main] c.DiyThreadTest - 200000 cost:14
package org.dsg.threads;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j(topic = "c.DiyThreadTest")
public class DiyThreadTest {
@Test
public void testCreateThread(){
Thread t = new Thread(() -> log.debug("running"));
t.start();
log.info("main thread running");
}
@Test
public void testFuture() throws Exception {
FutureTask<Integer> task = new FutureTask<>(() -> {
log.info("running...");
Thread.sleep(1000);
return 100;
});
Thread t1 = new Thread(task, "t1");
t1.start();
log.debug("{}", task.get());
}
@Test
public void testInterrupt() throws InterruptedException {
Thread t1 = new Thread(() -> {
while(true){
boolean interrupted = Thread.currentThread().isInterrupted();
if(interrupted){
log.debug("被打断了,退出循环");
break;
}
}
});
t1.start();
Thread.sleep(1000);
log.debug("interrupt");
t1.interrupt();
}
@Test
public void testThreadState(){
Thread t1 = new Thread("t1"){
@Override
public void run() {
log.debug("running...");
}
};
Thread t2 = new Thread("t2"){
@Override
public void run() {
while(true){
}
}
};
t2.start();
Thread t3 = new Thread("t3"){
@Override
public void run() {
log.debug("running...");
}
};
t3.start();
Thread t4 = new Thread("t4"){
@Override
public void run() {
synchronized(DiyThreadTest.class) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t4.start();
Thread t5 = new Thread("t5"){
@Override
public void run() {
try {
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t5.start();
Thread t6 = new Thread("t6"){
@Override
public void run() {
synchronized (DiyThreadTest.class){
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t6.start();
log.debug("t1 state {}", t1.getState());
log.debug("t2 state {}", t2.getState());
log.debug("t3 state {}", t3.getState());
log.debug("t4 state {}", t4.getState());
log.debug("t5 state {}", t5.getState());
log.debug("t6 state {}", t6.getState());
}
@Test
public void testPark() throws InterruptedException {
Thread t1 = new Thread(()->{
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
Thread.sleep(2000);
log.debug("unPark...");
LockSupport.unpark(t1);
}
/*********************** 转账 ***********************/
@Test
public void testAccount() throws InterruptedException {
Account a = new Account(1000);
Account b = new Account(1000);
Thread t1 = new Thread(()->{
for(int i=0;i<1000;i++) {
a.transfer(b, randomInt());
}
}, "t1");
Thread t2 = new Thread(()->{
for(int i=0;i<1000;i++) {
b.transfer(a, randomInt());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("total: {}", a.getMoney() + b.getMoney());
}
private static Random random = new Random();
private static int randomInt(){
return random.nextInt(100) + 1;
}
private static class Account{
private int money;
Account(int money){
this.money = money;
}
int getMoney() {
return money;
}
void setMoney(int money) {
this.money = money;
}
void transfer(Account target, int amount){
synchronized(Account.class) {
if (this.money >= amount) { //当前余额大于等于转账金额
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
}
/*********************** 面向对象改进锁 ***********************/
@Test
public void testSynchronized() throws InterruptedException {
//互斥逻辑封装在Room类的内部
Room room = new Room();
Thread t1 = new Thread(() -> {
for(int i=0;i<5000;i++){
room.increment();
}
}, "t1");
Thread t2 = new Thread(()->{
for(int i=0;i<5000;i++){
room.decrement();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", room.getCounter());
}
private static class Room {
private int counter = 0;
void increment(){
synchronized(this){
counter++;
}
}
void decrement(){
synchronized(this){
counter--;
}
}
int getCounter(){
synchronized (this){
return counter;
}
}
}
/*********************** 两阶段终止 **************************/
@Test
public void testTwoPhaseTermination() throws InterruptedException {
TwoPhaseTermination tpt = new TwoPhaseTermination();
tpt.start();
Thread.sleep(1500);
tpt.stop();
}
private static class TwoPhaseTermination{
private Thread monitor;
//启动监控线程
void start(){
monitor = new Thread(() -> {
while(true){
Thread current = Thread.currentThread();
if(current.isInterrupted()){
log.debug("处理后续");
break;
}
try {
Thread.sleep(1000);
log.debug("执行监控记录");
} catch (InterruptedException e) {
current.interrupt(); //重新设置打断标记
}
}
});
monitor.start();
}
//停止监控线程
void stop(){
monitor.interrupt();
}
}
/*********************** 两阶段终止 volatile实现 **************************/
@Test
public void testTwoPeriodTermination() throws InterruptedException {
TwoPeriodTermination termination = new TwoPeriodTermination();
termination.start();
Thread.sleep(2000);
log.info("监控停止...");
termination.stop();
}
/**
* 犹豫模式 -- 在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程
* 无需再做了,直接返回
* 比如监控线程只需要创建一个就行了
*/
private static class TwoPeriodTermination{
private Thread monitor;
private volatile boolean stop = false;
//是否执行过start方法
private boolean starting = false;
void start(){
synchronized(this) {
if (starting) {
return;
}
starting = true;
}
monitor = new Thread(()->{
while(true){
if(stop){
log.info("处理后续事项");
break;
}
try {
Thread.sleep(1000);
log.info("执行监控记录");
} catch (InterruptedException e) {
//current.interrupt();
}
}
});
monitor.start();
}
void stop(){
stop = true;
monitor.interrupt();
}
}
@Test
public void testAtomicInteger(){
AtomicInteger i = new AtomicInteger(0);
log.info(i.incrementAndGet() + "");
log.info(i.getAndIncrement() + "");
log.info(i.updateAndGet(value -> value * 10) + "");
}
/******************* 原子数组 ******************/
@Test
public void testAtomicIntegerArray(){
atomicIntArray(()->new int[10],
(array)->array.length, (array, index)->array[index]++, array-> log.info(Arrays.toString(array)));
atomicIntArray(()->new AtomicIntegerArray(10),
AtomicIntegerArray::length, AtomicIntegerArray::getAndIncrement, array-> log.info(array + ""));
}
private static <T> void atomicIntArray(Supplier<T> supplier,
Function<T, Integer> fun,
BiConsumer<T, Integer> biConsumer,
Consumer<T> consumer){
//supplier 提供者
//function 函数 一个参数一个结果
//consumer 消费者 一个参数无结果
List<Thread> threads = new ArrayList<>();
T array = supplier.get();
int length = fun.apply(array);
for(int i=0;i<length;i++){
threads.add(new Thread(()->{
for(int j=0;j<10000;j++){
biConsumer.accept(array, j%length);
}
}));
}
threads.forEach(Thread::start);
//等待所有线程结束
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer.accept(array);
}
/******************* 原子更新器 ******************/
@Test
public void testAtomicRefFieldUpdater(){
Student stu = new Student();
AtomicReferenceFieldUpdater<Student, String> updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
updater.compareAndSet(stu, null, "张三");
log.info(stu.toString());
}
private static class Student {
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
/******************* 原子累加器 ******************/
@Test
public void testAccumulate(){
accumulate(()->new AtomicLong(0), AtomicLong::getAndIncrement);
//LongAdder在有竞争时,设置多个累加单元,Thread-0累加Cell[0],而Thread-1累加Cell[1]...
//最后把结果汇总,这样在累加操作时操作的时不同的cell变量,因此减少了cas重试失败,因此性能更高
//Cell是数组形式,在内存中是连续存储的,一个Cell有24字节(16字节的对象头和8字节的值),因此缓存行可以存放2个Cell对象
//@sun.misc.Contended注解原理是在使用注解的对象或字段的前后各增加128字节大小的填充,从而让CPU将对象预读到缓存时占用不同的缓存行
accumulate(LongAdder::new, LongAdder::increment);
}
private static <T> void accumulate(Supplier<T> supplier, Consumer<T> consumer){
T accumulator = supplier.get();
List<Thread> threads = new ArrayList<>();
for(int i=0;i<4;i++){
threads.add(new Thread(()->{
for(int j=0;j<50000;j++){
consumer.accept(accumulator);
}
}));
}
long start = System.nanoTime();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
log.info(accumulator + " cost:" + (end - start)/1000_000);
}
}