虽然本文的标题是从 0 到 1 实现 ReentrantLock ,但是为了方便理解,我们先从一个问题出发:既然系统已经有 synchronized 关键字了,那么为什么还会出现 ReentrantLock 这种代码层面的锁?
这就要先回顾一下历史了:在 JDK 1.5 之前,synchronized 还没有经过优化,是一个重量级锁,会切换线程状态,比较耗时。那有没有其它的方式来保证线程同步呢?从上帝视角来说,java 中的 compareAndSet (即:CAS) 方法可以保证高并发下多线程安全地操作共享变量,那么利用 compareAndSet 方法实现一个工具类来模拟 synchronized 的效果也是有可能的。所以从 JDK 1.5 就添加了 ReentrantLock 及其相关类。
由于 ReentrantLock 是通过内部类实现 AbstractQueuedSynchronizer 来保证线程同步的,而 AbstractQueuedSynchronizer 又是个 模板类 ,为了适应不同的场景添加了许多和 锁 没有直接关系的概念,理解起来比较抽象,所以换个角度:如果让我们自己实现一个线程同步工具,应该怎么做?
首先,先想象一下这个锁工具应该怎么使用:
MyLock myLock = new MyLock();
// 被操作的共享变量
int count = 0;
new Thread(new Runnable() {
// MyLock 保证只有在一个线程执行完方法 exc(Lockable lockable) 中的 Lockable 后,才能有另一个线程执行 Lockable
myLock.exc(new Lockable() {
@Override
public boolean run(){
count++;
return true;
}
});
}).start();
new Thread(new Runnable() {
myLock.exc(new Lockable() {
@Override
public boolean run(){
count++;
return true;
}
});
}).start();
先不管 MyLock 怎么实现上面的要求,那这种方式有什么缺点吗?
MyLock myLock = new MyLock();
// 被操作的共享变量
int count = 0;
new Thread(new Runnable() {
// MyLock.lock() 保证一个线程拿到锁后,其它线程都会堵塞在 lock() 方法,待此线程执行 myLock.release() 后,其它线程再争夺锁
myLock.lock();
try{
count++;
} catch(Exception e) {}
finally{
myLock.release();
}
}).start();
new Thread(new Runnable() {
myLock.lock();
try{
count++;
} catch(Exception e) {}
finally{
myLock.release();
}
}).start();
这种方式显然比方式一看起来优雅多了,那么如何实现此方式呢?
因为 CAS 是借助于 Unsafe 执行的,如果在高版本使用 Unsafe ,IDEA 会报下面的错误:
cannot access class jdk.internal.misc.Unsafe xxx
解决方法如下:
点击 “Edit Configurations…” 进入参数配置界面
点击 “Modify options” 会弹出 “Add Run Options” 弹窗,然后选则 Java 模块的 “Add VM options”,配置界面就会出现一个新的参数输入框(绿色框)
绿色框中添加
--add-opens java.base/jdk.internal.misc=ALL-UNNAMED --illegal-access=warn
即可,下面是具体的实现方法:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
// 为 true 时表示有线程获得锁
private volatile boolean isLock;
public void lock() {
for (; ; ) {
if (compareAndSet(false, true)) {
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
}
}
}
public void unlock() {
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
}
}
public class MyLockTest {
public static int count;
public static void main(String[] args) throws InterruptedException {
MyLock myLock = new MyLock();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
// myLock.lock();
try {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println("Thread 1 and count:" + count);
}
}catch (Exception e){
}finally {
myLock.unlock();
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
// myLock.lock();
try {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println("Thread 2 and count:" + count);
}
}catch (Exception e){
}finally {
myLock.unlock();
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
运行上面代码,执行结果为:
...
Thread 1 and count:169
Thread 2 and count:169
Thread 1 and count:171
Thread 2 and count:170
Thread 2 and count:172
Thread 1 and count:172
172
可以看到在把 myLock.lock() 注释掉的情形下,Thread1 和 Thread2 交替执行,待都执行完毕后,最终结果是 172 和预期结果 200 不等。
将代码中 2 处 myLock.lock() 放开后,执行结果为:
Thread 1 and count:1
...// 这里都是 Thread 1
Thread 1 and count:100
Thread 2 and count:101
...// 这里都是 Thread 2
Thread 2 and count:200
200
多次测试后结果均为 200 与预期结果一致,且是待一个线程处理完 共享资源 后另一个线程才会处理,所以我们写的同步工具 MyLock 实现了 synchronized 的功能。
但是上面的写法没问题吗?
假设 Thread1 先执行,Thead2 被阻塞,如果此时另一个线程 Thread3 或者 main 线程,直接执行了 myLock.unlock() 方法,那么Thread2 就跳出了 myLock.lock() 中的 for 循环,和 Thread1 并行,这样就又出现线程同步问题。所以应该在 myLock.lock() 中记录获取到锁的线程,在 myLock.unlock() 中判断调用 unlock 的线程是否是获得锁的线程。修改后代码如下:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
// 持有锁的线程
private Thread exclusiveOwnerThread;
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
for (; ; ) {
if (compareAndSet(false, true)) {
Thread thread = Thread.currentThread();
setExclusiveOwnerThread(thread);
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
// 释放锁后,将持有锁的线程置为 null
setExclusiveOwnerThread(null);
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
锁被异常解除通过引入 ExclusiveOwnerThread 解决了,但又发现一个问题:如果在 Thread1 中调用了 2 次 myLock.lock() 会怎么样?并不是像下面这样直接调用 2 次
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
myLock.lock();
myLock.lock();
...
而是下面这种情况,在工作中很容易碰到:
public class MyLockTest {
private int count;
private MyLock myLock = new MyLock();
// 模拟在 thread 执行过程中需要再执行的一个同步方法
private void reentrantLock() {
myLock.lock();
System.out.println(Thread.currentThread().getName() + " reentrant lock");
myLock.unlock();
}
public static void main(String[] args) throws InterruptedException {
MyLockTest myLockTest = new MyLockTest();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
myLockTest.myLock.lock();
try {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myLockTest.count++;
System.out.println("Thread 1 and count:" + myLockTest.count);
if (myLockTest.count == 50) {
myLockTest.reentrantLock();
}
}
}catch (Exception e){
}finally {
myLockTest.myLock.unlock();
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
myLockTest.myLock.lock();
try {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myLockTest.count++;
System.out.println("Thread 2 and count:" + myLockTest.count);
if (myLockTest.count == 150) {
myLockTest.reentrantLock();
}
}
}catch (Exception e){
}finally {
myLockTest.myLock.unlock();
}
}
});
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("thread3");
myLockTest.myLock.unlock();
}
});
thread1.start();
thread2.start();
thread3.start();
thread1.join();
thread2.join();
thread3.join();
System.out.println(myLockTest.count);
}
}
假设 thread1 先获得了锁,然后执行 reentrantLock() 方法,由于 thread1 之前已经将 isLock 置为 true 了,所以 reentrantLock() 中的 myLock.lock() 会一直卡在 for 循环中。这样 thread1 就无法往下执行,也就不会释放锁,那么 thread2 也永远无法执行。既然 MyLock 中已存在 ExclusiveOwnerThread 变量,这就好办了,只需要在 lock() 方法中判断一下,让 reentrantLock() 不卡在 for 循环,代码如下:
public void lock() {
for (; ; ) {
Thread thread = Thread.currentThread();
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
break;
}
}
}
运行代码发现的确是不会卡在 for 循环了,但是打印结果变成了下面这样:
Thread 1 and count:1
...// 都是 Thread 1
Thread 1 and count:50
// 这个 Thread-0 就是第一个启动的线程 Thead1
// reentrantLock() 方法执行后打印的
Thread-0 reentrant lock
// reentrantLock() 方法执行完毕后,线程乱序
Thread 1 and count:51
Thread 2 and count:52
Thread 2 and count:53
Thread 1 and count:54
...
Thread 2 and count:180
180
这是由于 thread1 执行了 reentrantLock() 中的 myLock.unlock() 后将 isLock 置为 false,释放了锁导致的;而调用 unlock() 又不能不释放锁且 unlock() 方法并不知道当前线程调用了多少次 lock() 。所以这里提供两种解决方案:
因为给 reentrantLock() 添加一把新锁 myLock2,会出现死锁的情形。
例如:当 thread1 在执行 reentrantLock() 前,thread4 执行 reentrantLock() 获得了 myLock2 的锁,然后去获得 thread1 的锁 myLock,此时 myLock 被 thread1 占用,thread4 阻塞;待 thread1 执行到 reentrantLock() 时,myLock2 又被 thread4 占有。这样就造成了 thread1 和 thread4 彼此等待对方释放锁的场景,产生了死锁问题。总结:多线程中共同操作多把锁有几率产生死锁问题。
注意新加的变量 lockCount
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
private Thread exclusiveOwnerThread;
// 记录 lock() 执行成功的次数
private int lockCount;
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
for (; ; ) {
Thread thread = Thread.currentThread();
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
// 因为只有先得到锁的线程才能执行到这,所以对于 lockCount 的操作,可以保证线程安全
lockCount++;
break;
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 因为 unlock 只有在先得到锁的线程中才能正确执行,所以对于 lockCount 的操作,可以保证线程安全
lockCount--;
if (lockCount == 0) {
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
setExclusiveOwnerThread(null);
}
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
经过上面的一系列调整 MyLock 已经实现了一个锁的基本功能了。那么当第一个获得锁的线程释放锁后,其它线程该怎么获得锁呢?例如下面的情形,开始的时候创建了 5 个线程:
public class MyLockTest {
private int count;
private MyLock myLock = new MyLock();
private void reentrantLock() {
myLock.lock();
System.out.println(Thread.currentThread().getName()
+ " reentrant lock");
myLock.unlock();
}
public static void main(String[] args) throws InterruptedException {
MyLockTest myLockTest = new MyLockTest();
Runnable runnable = () -> {
myLockTest.myLock.lock();
try {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myLockTest.count++;
System.out.println(Thread.currentThread().getName()
+ " and count:" + myLockTest.count);
if (i == 50) {
myLockTest.reentrantLock();
}
}
}catch (Exception e){
}finally {
myLockTest.myLock.unlock();
}
};
Thread thread;
for (int i = 0; i < 5; i++) {
thread = new Thread(runnable, "Thread " + (i + 1));
thread.start();
thread.join();
}
System.out.println(myLockTest.count);
}
}
当第一个线程释放锁时,其它四个线程都卡在 myLock.lock() 的 for 循环中,至于谁能第二个抢到锁就凭运气了,不会按照调用 myLock.lock() 的先后顺序执行。这种不以申请锁的先后顺序执行,而是通过抢占锁执行的方式叫作 非公平锁;于此相反按照申请锁的顺序执行的方式叫作 公平锁。总结一下就是:
公平锁:每个线程获取锁的顺序是按照线程访问锁的先后顺序获取的,最前面的线程总是最先获取到锁。
非公平锁:每个线程获取锁的顺序是随机的,并不会遵循先来先得的规则,所有线程会竞争获取锁。
既然现在的 MyLock 是非公平锁,那么公平锁应该怎么实现?
线程的公平锁是按照申请锁的先后顺序获取的,那么就需要在 lock() 方法中记录下线程的顺序。代码如下:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
private boolean fair;
public MyLock() {}
public MyLock(boolean fair) {
this.fair = fair;
}
private Thread exclusiveOwnerThread;
// 记录 lock() 执行成功的次数
private int lockCount;
private List<Node> threadOrder = new ArrayList<>();
private static final class Node {
public Node(Thread thread) {
this.thread = thread;
}
// 记录申请锁的线程
Thread thread;
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
if (fair) {
tryAcquireFair();
} else {
tryAcquireUnfair();
}
}
public void tryAcquireUnfair() {
for (; ; ) {
Thread thread = Thread.currentThread();
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
}
}
public void tryAcquireFair() {
threadOrder.add(new Node(Thread.currentThread()));
for (; ; ) {
Thread thread = Thread.currentThread();
Node node = null;
try {
// 由于 threadOrder 不是线程安全的,且在 unlock 方法中会移除一个 node 节点
// 这里有可能会数组越界
node = threadOrder.get(0);
}catch (Exception e){
// 当数组越界时,说明还未执行的 node 节点被删除,会导致下面的匹配一直不成功,
// 卡在 for 循环中
}
if (node != null && thread == node.thread) {
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
return;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
return;
}
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
System.out.println("before crash exclusiveOwnerThread is "
+ exclusiveOwnerThread);
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 因为 unlock 只有在先得到锁的线程中才能正确执行,所以对于 lockCount 的操作,可以保证线程安全
lockCount--;
if (lockCount == 0) {
int deletePosition = -1;
for (int i = 0; i < threadOrder.size(); i++) {
if (thread == threadOrder.get(i).thread) {
deletePosition = i;
} else {
if (deletePosition > -1) {
threadOrder.set(i - 1, threadOrder.get(i));
}
}
}
threadOrder.remove(threadOrder.size() - 1);
setExclusiveOwnerThread(null);
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
}
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
public class MyLockTest {
private int count;
private MyLock myLock = new MyLock(true);
public static void main(String[] args) throws InterruptedException {
MyLockTest myLockTest = new MyLockTest();
Runnable runnable = () -> {
myLockTest.myLock.lock();
// 打印执行的顺序
System.out.println(Thread.currentThread().getName());
myLockTest.count++;
myLockTest.myLock.unlock();
};
Thread thread;
for (int i = 0; i < 5; i++) {
thread = new Thread(runnable, "Thread " + (i + 1));
thread.start();
}
Thread.sleep(5000);
System.out.println(myLockTest.count);
}
}
执行上面代码后,电脑风扇疯狂转,且输出结果为:
// 这个顺序是调用 myLock.lock() 的顺序,不是 new Thread 的顺序
Thread 1
Thread 5
Thread 3
Thread 2
4
Process finished with exit code 130
明显少执行了 Thread4,这是因为对 threadOrder 的操作是非线程安全的,在 unlock 中整理数组长度(不整理数据长度会造成数组中有很多空数据,随着程序运行时间的增加,会导致内存浪费和遍历耗时增加)时,将 thread4 对应的 Node 节点删除了,thread4 一直匹配不到卡在了 for 循环中。
既然数组存储不能保证线程安全,那么使用链表呢?
链表的实现方式:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
// 是否使用公平锁
private boolean fair;
public MyLock() {}
public MyLock(boolean fair) {
this.fair = fair;
}
private Thread exclusiveOwnerThread;
// 记录单个线程 lock() 执行成功的次数
private int lockCount;
private volatile Node head;
private static final long HEAD = U.objectFieldOffset(MyLock.class, "head");
public final boolean headCompareAndSet(Node oldHead, Node newHead) {
return U.compareAndSetObject(this, HEAD, oldHead, newHead);
}
private volatile Node tail;
private static final long TAIL = U.objectFieldOffset(MyLock.class, "tail");
public final boolean tailInitCompareAndSet() {
return U.compareAndSetObject(this, TAIL, null, head);
}
public final boolean tailToNextCompareAndSet() {
return U.compareAndSetObject(this, TAIL, tail, tail.next);
}
private static final long TAIL_NEXT = U.objectFieldOffset(Node.class, "next");
public final boolean tailInitNextCompareAndSet(Node tailNext) {
return U.compareAndSetObject(tail, TAIL_NEXT, null, tailNext);
}
private static final class Node {
static final int FINISH = 1;
volatile int waitStatus;
volatile Node next;
public Node(Thread thread) {
this.thread = thread;
}
// 记录申请锁的线程
volatile Thread thread;
@Override
public String toString() {
return thread.getName() + " next:" + (next == null ? next : next.thread.getName());
}
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
if (fair) {
tryAcquireFair();
} else {
tryAcquireUnfair();
}
}
public void tryAcquireUnfair() {
for (; ; ) {
Thread thread = Thread.currentThread();
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
}
}
/**
* 用于对比线程的执行顺序和调用 lock() 的顺序是否一致,后面可以删掉
*/
private List<String> threadNames = new ArrayList<>();
public void tryAcquireFair() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (headCompareAndSet(null, node)){
threadNames.add(node.thread.getName());
tailInitCompareAndSet();
} else {
// 其它没成功设置 head 的线程都进入这个死循环,直到 tail 也设置完成
for (;;) {
if (tail != null) {
for (;;) {
// 一直循环直到给 tail.next 设置成功
// 之所以用循环,是因为可能多个线程在竞争执行 tailInitNextCompareAndSet
// 当一个线程成功给 tail.next 设置好 node 值后,
// tailToNextCompareAndSet 重置 tail
if (tailInitNextCompareAndSet(node)){
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 unlock 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
Node oldHead = head;
if (oldHead.waitStatus == Node.FINISH && oldHead.next != null) {
// 1.如果 head 线程已经结束,且 unlock 中的 head.next == null,
// 那么在加入一个新的尾节点后,head.next 一定不为 null,节点应往下走一步
// 2.如果 head 线程结束后,unlock 中的 headCompareAndSet 执行成功,
// head 会往前移动一步,那么当这里的 oldHead 是 head 移动前的 Node 时,
// 下面执行不成功;当 oldHead 是新节点时 waitStatus != Node.FINISH,
// 若新的 head 又走到了 unlock 的 headCompareAndSet 时,这就和第一种情况一样了。
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
}
}
threadNames.add(node.thread.getName());
tailToNextCompareAndSet();
break;
}
}
// 成功追加到 tail,跳出循环
break;
}
// tail 为 null 时说明非 head 线程还没有加入到 Node 节点中,需要一直循环直到加入
}
}
for (;;) {
if (head != null && thread == head.thread) {
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
System.out.println("before crash exclusiveOwnerThread is "
+ exclusiveOwnerThread);
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 因为 unlock 只有在先得到锁的线程中才能正确执行,所以对于 lockCount 的操作,可以保证线程安全
lockCount--;
if (lockCount == 0) {
if (head != null) {
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 tryAcquireFair 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
Node oldHead = head;
oldHead.waitStatus = Node.FINISH;
if (oldHead.next != null) {
// head.next == null 时,并不能说明所有线程已执行完毕,
// 有可能还有线程未加入 Node 节点中
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
}
}
}
setExclusiveOwnerThread(null);
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
}
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
public class MyLockTest {
private List<String> threadNames = new ArrayList<>();
private int count;
private int threadCount = 338;
private long startTime;
private MyLock myLock = new MyLock(true);
public static void main(String[] args) throws InterruptedException {
MyLockTest myLockTest = new MyLockTest();
Runnable runnable = () -> {
myLockTest.myLock.lock();
myLockTest.threadNames.add(Thread.currentThread().getName());
myLockTest.count++;
if (myLockTest.count == 1) {
myLockTest.startTime = System.currentTimeMillis();
} else if (myLockTest.count == myLockTest.threadCount) {
System.out.println("total time:" + (System.currentTimeMillis() - myLockTest.startTime));
}
// 打印执行的顺序
System.out.println(myLockTest.count + " --- " + Thread.currentThread().getName());
myLockTest.myLock.unlock();
};
Thread thread;
for (int i = 0; i < myLockTest.threadCount; i++) {
thread = new Thread(runnable, "Thread " + (i + 1));
thread.start();
}
Thread.sleep(10000);
// 打印一下各个线程的实际运行次序
System.out.println("thread opera orders:" + myLockTest.threadNames);
System.out.println(myLockTest.count);
}
}
经过测试使用链表的方式是可以的,但要注意里面的逻辑比较绕。这样我们就得到了一个公平锁的写法,虽然相对于非公平锁麻烦多了。
执行上面的代码后,打印结果如下:
...
336 --- Thread 67
337 --- Thread 62
total time:84260
338 --- Thread 15
平均每个线程耗时 249ms ,而线程中又没有耗时任务,理论上不可能耗时这么久。显然是因为线程争抢锁资源时,for 循环中的 CAS 总是返回 false,然后一直执行 for 循环导致的。
像这种在解决多线程问题时,获取锁失败后,通过死循环的方式重新获取锁,直到成功为止。这种方案叫 自旋锁。
另外,CAS 即 Compare And Swap(比较与交换),是一种 无锁算法,即在不使用锁的情况下实现多线程之间变量的同步,所以也叫 非阻塞同步(Non-blocking Synchronization)。
像 CAS 这样,假设多个线程在操作共享数据时,不会发生冲突,因此不需要加锁,而是在更新数据时,通过比较当前状态和上一次的状态,来判断是否有其他线程修改了数据。如果没有冲突,就执行更新操作,否则就重试或者放弃。这种方式叫作 乐观锁。
乐观锁的优点
乐观锁的缺点
ABA 问题。 即如果一个变量 V 初次读取时值是A,并且在准备赋值时检查它的值仍然是 A,那能说明它的值没有被其他线程修改过吗?显然不能。因为可能在准备赋值前,V 的值被其他线程修改成了其他值 B,然后又修改成了 A,那么 CAS 操作就会误认为它从来没有被修改过,这个就是 CAS 操作的 ABA 问题
循环时间长开销大。 自旋 CAS,不成功就会一直循环执行直到成功,如果长时间不成功,会给CPU带来很大的开销。
只能保证一个共享变量的原子操作。 CAS 只对单个共享变量有效,当操作涉及到多个共享变量时CAS 无效。但在 JDK1.5 开始,提供了AtomicReference 类来保证引用对象之间的原子性,可以把多个变量放在一个对象里进行 CAS 操作。所以可以使用锁或者是利用 AtomicReference 类把多个共享变量合并成一个共享变量来操作
乐观锁使用场景
适用于线程冲突较少的情景,一般在写操作比较少,读操作比较多的情况下推荐使用
对于自旋锁在大量线程并发下,缺点很明显,CPU资源耗尽、应用卡死、手机发烫等(例如上面单线程的平局耗时为 249ms)。那么怎么解决呢?
我们可以在第一次获锁失败后,将当前线程阻塞,有锁资源时再唤醒。这种方案叫 阻塞锁。
由于 JDK 中已提供了线程的阻塞方法 LockSupport.park() 所以我们就不需要自己实现了。LockSupport 类内容很少,其本质还是使用 Unsafe 实现的一系列静态方法。
下面将代码改成阻塞锁的模式:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
// 是否使用公平锁
private boolean fair;
public MyLock() {}
public MyLock(boolean fair) {
this.fair = fair;
}
private Thread exclusiveOwnerThread;
// 记录 lock() 执行成功的次数
private int lockCount;
private volatile Node head;
private static final long HEAD = U.objectFieldOffset(MyLock.class, "head");
public final boolean headCompareAndSet(Node oldHead, Node newHead) {
return U.compareAndSetObject(this, HEAD, oldHead, newHead);
}
private volatile Node tail;
private static final long TAIL = U.objectFieldOffset(MyLock.class, "tail");
public final boolean tailInitCompareAndSet() {
return U.compareAndSetObject(this, TAIL, null, head);
}
public final boolean tailToNextCompareAndSet() {
return U.compareAndSetObject(this, TAIL, tail, tail.next);
}
private static final long TAIL_NEXT = U.objectFieldOffset(Node.class, "next");
public final boolean tailInitNextCompareAndSet(Node tailNext) {
return U.compareAndSetObject(tail, TAIL_NEXT, null, tailNext);
}
private static final class Node {
static final int FINISH = 1;
volatile int waitStatus;
volatile Node next;
public Node(Thread thread) {
this.thread = thread;
}
// 记录申请锁的线程
volatile Thread thread;
@Override
public String toString() {
return thread.getName() + " next:" + (next == null ? next : next.thread.getName());
}
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
if (fair) {
tryAcquireFair();
} else {
tryAcquireUnfair();
}
}
public void tryAcquireUnfair() {
for (; ; ) {
Thread thread = Thread.currentThread();
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
}
}
/**
* 用于对比线程的执行顺序和调用 lock() 的顺序是否一致,后面可以删掉
*/
private List<String> threadNames = new ArrayList<>();
public void tryAcquireFair() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (headCompareAndSet(null, node)){
threadNames.add(node.thread.getName());
tailInitCompareAndSet();
} else {
// 其它没成功设置 head 的线程都进入这个死循环,直到 tail 也设置完成
for (;;) {
if (tail != null) {
for (;;) {
// 一直循环直到给 tail.next 设置成功
// 之所以用循环,是因为可能多个线程在竞争执行 tailInitNextCompareAndSet
// 当一个线程成功给 tail.next 设置好 node 值后,
// tailToNextCompareAndSet 重置 tail
if (tailInitNextCompareAndSet(node)){
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 unlock 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
Node oldHead = head;
if (oldHead.waitStatus == Node.FINISH && oldHead.next != null) {
// 1.如果 head 线程已经结束,且 unlock 中的 head.next == null,
// 那么在加入一个新的尾节点后,head.next 一定不为 null,节点应往下走一步
// 2.如果 head 线程结束后,unlock 中的 headCompareAndSet 执行成功,
// head 会往前移动一步,那么当这里的 oldHead 是 head 移动前的 Node 时,
// 下面执行不成功;当 oldHead 是新节点时 waitStatus != Node.FINISH,
// 若新的 head 又走到了 unlock 的 headCompareAndSet 时,这就和第一种情况一样了。
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
LockSupport.unpark(head.thread);
}
}
threadNames.add(node.thread.getName());
tailToNextCompareAndSet();
break;
}
}
// 成功追加到 tail,跳出循环
break;
}
// tail 为 null 时说明非 head 线程还没有加入到 Node 节点中,需要一直循环直到加入
}
}
for (;;) {
if (head != null && thread == head.thread) {
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
} else {
// 当当前线程不是 head.thread 线程时,在公平锁的机制下不应该获取到锁,所以应该阻塞一下
LockSupport.park();
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
System.out.println("before crash exclusiveOwnerThread is "
+ exclusiveOwnerThread);
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 因为 unlock 只有在先得到锁的线程中才能正确执行,所以对于 lockCount 的操作,可以保证线程安全
lockCount--;
if (lockCount == 0) {
if (head != null) {
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 tryAcquireFair 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
Node oldHead = head;
oldHead.waitStatus = Node.FINISH;
if (oldHead.next != null) {
// head.next == null 时,并不能说明所有线程已执行完毕,
// 有可能还有线程未加入 Node 节点中
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
LockSupport.unpark(head.thread);
}
}
}
setExclusiveOwnerThread(null);
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
}
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
运行上面代码,打印输出结果:
...
total time:66
338 --- Thread 338
之前的总耗时为 84260ms,现在为 66ms,性能提高了约 1277 倍。
在非公平锁模式中线程只抢占 CAS,而不需要判断是否符合执行顺序,理论上耗时应该远小于公平锁。
下面看一下实现阻塞前非公平锁的耗时情况:
...
337 --- Thread 272
total time:14341
338 --- Thread 307
执行结果与预期一致。下面是设置阻塞后的代码:
public class MyLock {
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(MyLock.class, "isLock");
// 是否使用公平锁
private boolean fair;
public MyLock() {
this(false);
}
public MyLock(boolean fair) {
this.fair = fair;
if (!this.fair) {
head = Node.HEAD;
tail = head;
}
}
private Thread exclusiveOwnerThread;
// 记录 lock() 执行成功的次数
private int lockCount;
private volatile Node head;
private static final long HEAD = U.objectFieldOffset(MyLock.class, "head");
public final boolean headCompareAndSet(Node oldHead, Node newHead) {
return U.compareAndSetObject(this, HEAD, oldHead, newHead);
}
private volatile Node tail;
private static final long TAIL = U.objectFieldOffset(MyLock.class, "tail");
public final boolean tailInitCompareAndSet() {
return U.compareAndSetObject(this, TAIL, null, head);
}
public final boolean tailToNextCompareAndSet() {
return U.compareAndSetObject(this, TAIL, tail, tail.next);
}
private static final long NODE_NEXT = U.objectFieldOffset(Node.class, "next");
public final boolean tailInitNextCompareAndSet(Node tailNext) {
return U.compareAndSetObject(tail, NODE_NEXT, null, tailNext);
}
public final boolean nodeSetNextCompareAndSet(Node node, Node nextExpected, Node nextNode) {
return U.compareAndSetObject(node, NODE_NEXT, nextExpected, nextNode);
}
private static final class Node {
static final Node HEAD = new Node(null);
static final int FINISH = 1;
volatile int waitStatus;
volatile Node next;
public Node(Thread thread) {
this.thread = thread;
}
// 记录申请锁的线程
volatile Thread thread;
@Override
public String toString() {
return (thread == null ? thread : thread.getName())
+ " next:"
+ (next != null && next.thread != null ? next.thread.getName() : "");
}
}
public final boolean compareAndSet(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, VALUE, expectedValue, newValue);
}
private volatile boolean isLock;
public void lock() {
if (fair) {
tryAcquireFair();
} else {
tryAcquireUnfair();
}
}
public void tryAcquireUnfair() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
for (;;) {
if (tailInitNextCompareAndSet(node)){
threadNames.add(node.thread.getName());
tailToNextCompareAndSet();
break;
}
}
for (;;) {
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
LockSupport.park();
}
}
/**
* 用于对比线程的执行顺序和调用 lock() 的顺序是否一致,后面可以删掉
*/
private List<String> threadNames = new ArrayList<>();
public void tryAcquireFair() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (headCompareAndSet(null, node)){
threadNames.add(node.thread.getName());
tailInitCompareAndSet();
} else {
// 其它没成功设置 head 的线程都进入这个死循环,直到 tail 也设置完成
for (;;) {
if (tail != null) {
for (;;) {
// 一直循环直到给 tail.next 设置成功
// 之所以用循环,是因为可能多个线程在竞争执行 tailInitNextCompareAndSet
// 当一个线程成功给 tail.next 设置好 node 值后,
// tailToNextCompareAndSet 重置 tail
if (tailInitNextCompareAndSet(node)){
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 unlock 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
Node oldHead = head;
if (oldHead.waitStatus == Node.FINISH && oldHead.next != null) {
// 1.如果 head 线程已经结束,且 unlock 中的 head.next == null,
// 那么在加入一个新的尾节点后,head.next 一定不为 null,节点应往下走一步
// 2.如果 head 线程结束后,unlock 中的 headCompareAndSet 执行成功,
// head 会往前移动一步,那么当这里的 oldHead 是 head 移动前的 Node 时,
// 下面执行不成功;当 oldHead 是新节点时 waitStatus != Node.FINISH,
// 若新的 head 又走到了 unlock 的 headCompareAndSet 时,这就和第一种情况一样了。
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
LockSupport.unpark(head.thread);
}
}
threadNames.add(node.thread.getName());
tailToNextCompareAndSet();
break;
}
}
// 成功追加到 tail,跳出循环
break;
}
// tail 为 null 时说明非 head 线程还没有加入到 Node 节点中,需要一直循环直到加入
}
}
for (;;) {
if (head != null && thread == head.thread) {
if (compareAndSet(false, true)) {
setExclusiveOwnerThread(thread);
lockCount++;
// CAS 返回 true 则另一个线程调用 lock() 肯定会返回 false,那么就会一直卡在 for 循环中
break;
} else if (thread == getExclusiveOwnerThread()) {
lockCount++;
break;
}
} else {
// 当当前线程不是 head.thread 线程时,在公平锁的机制下不应该获取到锁,所以应该阻塞一下
LockSupport.park();
}
}
}
public void unlock() {
Thread thread = Thread.currentThread();
if (thread != exclusiveOwnerThread) {
System.out.println("before crash exclusiveOwnerThread is "
+ exclusiveOwnerThread);
// 只有获得锁的线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
}
// 因为 unlock 只有在先得到锁的线程中才能正确执行,所以对于 lockCount 的操作,可以保证线程安全
lockCount--;
if (lockCount == 0) {
if (fair) {
Node oldHead = head;
oldHead.waitStatus = Node.FINISH;
oldHead.thread = null;
// 这里必须使用 oldHead 往 headCompareAndSet 传值
// 因为这里的 headCompareAndSet 和 tryAcquireFair 里的 headCompareAndSet 有可能都在等待执行,
// 必须保证有且只有一个执行成功。
if (oldHead.next != null) {
// head.next == null 时,并不能说明所有线程已执行完毕,
// 有可能还有线程未加入 Node 节点中
if (headCompareAndSet(oldHead, oldHead.next)) {
// 已经执行过的 Node 断开链,在 GC 时进行回收
oldHead.next = null;
LockSupport.unpark(head.thread);
}
}
} else {
Node nextNode;
Node currentNode = Node.HEAD.next;
Node prevNode = Node.HEAD;
while (currentNode != null) {
if (currentNode.thread == thread) {
currentNode.waitStatus = Node.FINISH;
currentNode.thread = null;
}
nextNode = currentNode.next;
if (currentNode.thread == null && nextNode != null) {
// 删除已执行的节点,prevNode 不变
prevNode.next = nextNode;
// 将 currentNode.next 置为 null,会报
// Could not load hsdis-amd64.dll; library not loadable; PrintAssembly is disabled
// 错误,所以这里注释掉,因为没有引用 currentNode 的节点了,所以不影响 GC
// currentNode.next = null;
currentNode = nextNode;
continue;
}
prevNode = currentNode;
currentNode = currentNode.next;
}
}
setExclusiveOwnerThread(null);
// 由于之前 lock 执行成功,所以 isLock 一定为 true,所以这里可以直接置为 false
compareAndSet(true, false);
if (fair) {
LockSupport.unpark(head.thread);
} else {
Node currentNode = Node.HEAD.next;
while (currentNode != null) {
// 唤醒所有线程重新竞争锁
LockSupport.unpark(currentNode.thread);
currentNode = currentNode.next;
}
}
}
}
public void printAllNodes() {
Node node;
String str = null;
if (fair) {
node = head;
str = "all fair nodes:";
} else {
node = Node.HEAD.next;
str = "all unfair nodes:";
}
while (node != null) {
System.out.println(str + node.thread + " next:" + node.next);
node = node.next;
}
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
}
public class MyLockTest {
private List<String> threadNames = new ArrayList<>();
private int count;
private int threadCount = 338;
private long startTime;
private MyLock myLock = new MyLock(false);
public static void main(String[] args) throws InterruptedException {
MyLockTest myLockTest = new MyLockTest();
Runnable runnable = () -> {
myLockTest.myLock.lock();
myLockTest.threadNames.add(Thread.currentThread().getName());
myLockTest.count++;
// 打印执行的顺序
System.out.println(myLockTest.count + " --- " + Thread.currentThread().getName());
if (myLockTest.count == 1) {
myLockTest.startTime = System.currentTimeMillis();
} else if (myLockTest.count == myLockTest.threadCount) {
System.out.println("total time:" + (System.currentTimeMillis() - myLockTest.startTime));
}
myLockTest.myLock.unlock();
};
Thread thread;
for (int i = 0; i < myLockTest.threadCount; i++) {
thread = new Thread(runnable, "Thread " + (i + 1));
thread.start();
}
Thread.sleep(5000);
// 打印一下各个线程的实际运行次序
System.out.println("thread opera orders:" + myLockTest.threadNames);
System.out.println(myLockTest.count);
myLockTest.myLock.printAllNodes();
}
}
执行代码,打印输出内容:
1 --- Thread 1
2 --- Thread 2
3 --- Thread 40
4 --- Thread 65
...
338 --- Thread 338
total time:33
thread opera orders:[Thread 1 ....
338
all unfair nodes:null next:null
之前的总耗时为 14341ms,现在为 33ms,性能提高了约 435 倍。
经过上面一系列的调整,我们终于实现了自己的可重入锁。虽然上面的代码还有一些待优化的点,例如:
但是到此篇幅太长了,就不往下写了。
最后,欢迎扫码关注公众号,将不定时分享干货内容。