Java中
Condition
接口为多线程编程带来了显著的优势,它不仅能够实现线程间的精准协调,让等待和通知机制更加灵活,还能有效避免“惊群效应”带来的性能损耗。与Object
的wait
和notify
相比,Condition
提供了更好控制粒度,允许多个等待集合独立管理,使得多线程间的交互更加高效且易于维护,此外,Condition
的await
方法能够响应中断,增强了程序的健壮性。
在Java中,Condition是一个接口,它提供了线程之间的协调机制,可以将它看作是一个更加灵活、更加强大的wait()和notify()机制,通常与Lock接口(比如ReentrantLock)一起使用。它的核心作用体现在两个方面,如下:
举个生活中的实际案例:假设,张三正在经营一个智能化的餐厅,顾客可以通过一个电子点餐系统来点餐,这个系统有一个显示屏,显示各种菜品的准备状态(如“准备中”、“已就绪”、“已售出”等),在这个场景中,Condition
可以发挥巨大的作用:
在这个例子中,Condition
提供了一种有效的方式来协调不同角色(厨师、服务员、顾客)之间的交互,确保每个人都在正确的时间得到他们需要的信息或资源。这比简单地使用wait()
和notify()
要强大得多,因为它允许更细粒度的控制和更复杂的交互模式。
Condition
接口常常与Lock
接口一起使用,提供了一种更加灵活的线程同步机制,相比于使用Object
的wait()
、notify()
和notifyAll()
方法,Condition
允许多个等待队列(即可以有多个Condition
对象),并且提供了更高的控制精度。
下面是一个使用Condition
的示例场景:生产者-消费者问题。在这个场景中,有一个共享的数据缓冲区,生产者向缓冲区中放置数据,而消费者从缓冲区中取出数据,当缓冲区满时,生产者需要等待;当缓冲区空时,消费者需要等待,使用Condition
可以很容易地实现这一点,如下代码案例:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
// 共享的缓冲区
private final Queue<Integer> buffer = new LinkedList<>();
// 缓冲区最大容量
private static final int MAX_SIZE = 5;
// 锁对象
private final Lock lock = new ReentrantLock();
// 生产者条件
private final Condition producerCondition = lock.newCondition();
// 消费者条件
private final Condition consumerCondition = lock.newCondition();
// 生产者方法
public void produce() throws InterruptedException {
lock.lock();
try {
while (buffer.size() == MAX_SIZE) {
// 缓冲区满,生产者等待
producerCondition.await();
}
// 生产一个数据
int data = (int) (Math.random() * 100);
buffer.offer(data);
System.out.println("Produced: " + data);
// 通知消费者
consumerCondition.signalAll();
} finally {
lock.unlock();
}
}
// 消费者方法
public void consume() throws InterruptedException {
lock.lock();
try {
while (buffer.isEmpty()) {
// 缓冲区空,消费者等待
consumerCondition.await();
}
// 消费一个数据
int data = buffer.poll();
System.out.println("Consumed: " + data);
// 通知生产者
producerCondition.signalAll();
} finally {
lock.unlock();
}
}
// 客户端调用示例
public static void main(String[] args) {
ProducerConsumerExample pc = new ProducerConsumerExample();
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.produce();
Thread.sleep(100); // 模拟生产耗时
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
Thread.sleep(150); // 模拟消费耗时
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动线程
producerThread.start();
consumerThread.start();
}
}
在上面代码中,定义了ProducerConsumerExample
类,它包含了共享缓冲区、锁和条件变量,生产者线程调用produce()
方法,而消费者线程调用consume()
方法,当缓冲区满时,生产者通过调用producerCondition.await()
等待;当缓冲区空时,消费者通过调用consumerCondition.await()
等待,当条件满足时(即缓冲区不满或不为空),相应的线程会被唤醒,并通过调用signalAll()
方法通知其他等待的线程。
详见官方文档:https://docx.iamqiang.com/jdk11/api/java.base/java/util/concurrent/locks/Condition.html
Modifier and Type | Method | Description |
---|---|---|
void | await() | 使当前线程等待,直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断,或者发生了其他不可预知的情况(如假唤醒)。该方法会在等待之前释放当前线程所持有的锁,在被唤醒后会再次尝试获取锁。 |
boolean | await(long time, TimeUnit unit) | 使当前线程等待指定的时间,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断。如果在指定的时间内没有被唤醒,该方法将返回。在等待之前会释放当前线程所持有的锁,在被唤醒或超时后会再次尝试获取锁。 |
long | awaitNanos(long nanosTimeout) | 使当前线程等待指定的纳秒数,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒。如果在指定的时间内没有被唤醒,该方法将返回。 |
void | awaitUninterruptibly() | 与 await() 类似,但不会响应线程的中断。即使线程在等待期间被中断,它也会继续等待,直到被 signal() 或 signalAll() 方法唤醒。 |
boolean | awaitUntil(Date deadline) | 使当前线程等待,直到达到指定的截止日期,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断。 |
void | signal() | 唤醒等待在此 Condition 上的一个线程。如果有多个线程正在等待,则选择其中的一个进行唤醒。被唤醒的线程将从其 await() 调用中返回,并重新尝试获取与此 Condition 关联的锁。 |
void | signalAll() | 唤醒等待在此 Condition 上的所有线程。每个被唤醒的线程都将从其 await() 调用中返回,并重新尝试获取与此 Condition 关联的锁。 |
下面是一个使用Condition的await()方法的示例场景:有界缓冲区。在这个场景中,有一个固定大小的缓冲区,生产者向其中添加元素,而消费者从中移除元素,如果缓冲区已满,生产者需要等待;如果缓冲区为空,消费者需要等待,如下代码:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Queue<Integer> buffer;
private final int maxSize;
private final Lock lock;
// Condition for producers
private final Condition notFull;
// Condition for consumers
private final Condition notEmpty;
public BoundedBuffer(int maxSize) {
this.maxSize = maxSize;
this.buffer = new LinkedList<>();
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
// Produce an item
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (buffer.size() == maxSize) {
// Buffer is full, wait for space
notFull.await();
}
buffer.offer(item);
// Signal a waiting consumer
notEmpty.signal();
} finally {
lock.unlock();
}
}
// Consume an item
public Integer consume() throws InterruptedException {
lock.lock();
try {
while (buffer.isEmpty()) {
// Buffer is empty, wait for items
notEmpty.await();
}
// Signal a waiting producer
notFull.signal();
return buffer.poll();
} finally {
lock.unlock();
}
}
// Client code to demonstrate usage
public static void main(String[] args) {
BoundedBuffer buffer = new BoundedBuffer(5);
// Producer thread
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
buffer.produce(i);
System.out.println("Produced: " + i);
Thread.sleep(100); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Integer item = buffer.consume();
System.out.println("Consumed: " + item);
Thread.sleep(200); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
}
}
在上面代码中,BoundedBuffer
类使用了Lock
和Condition
来实现一个线程安全的有界缓冲区,produce()
方法在缓冲区满时等待,而consume()
方法在缓冲区空时等待,当生产者生产了一个元素后,它会通知等待的消费者;同样地,当消费者消费了一个元素后,它会通知等待的生产者。
Condition接口提供了signal()和signalAll()方法,用于唤醒等待在Condition对象上的线程,这两个方法通常与await()方法一起使用,以实现线程间的协调。
以下是Condition的signal()和signalAll()方法的使用场景示例:生产者-消费者问题。在这个场景中,生产者生产数据并放入缓冲区,消费者从缓冲区中取出数据。当缓冲区为空时,消费者需要等待;当缓冲区满时,生产者需要等待,如下代码:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerQueue {
private final Queue<Integer> queue;
private final int maxSize;
private final Lock lock;
// Condition for producers
private final Condition notFull;
// Condition for consumers
private final Condition notEmpty;
public ProducerConsumerQueue(int maxSize) {
this.maxSize = maxSize;
this.queue = new LinkedList<>();
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == maxSize) {
// Buffer is full, wait for space
notFull.await();
}
queue.add(item);
// Signal a waiting consumer (or all with signalAll())
notEmpty.signal(); // Change to notEmpty.signalAll() to wake up all consumers
} finally {
lock.unlock();
}
}
public Integer consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
// Buffer is empty, wait for items
notEmpty.await();
}
// Remove an item from the buffer
Integer item = queue.remove();
// Signal a waiting producer (or all with signalAll())
notFull.signal(); // Change to notFull.signalAll() to wake up all producers
return item;
} finally {
lock.unlock();
}
}
// Client code to demonstrate usage
public static void main(String[] args) {
ProducerConsumerQueue queue = new ProducerConsumerQueue(5);
// Producer thread
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
queue.produce(i);
System.out.println("Produced: " + i);
Thread.sleep(100); // Simulate work
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// Consumer thread
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Integer item = queue.consume();
System.out.println("Consumed: " + item);
Thread.sleep(200); // Simulate work
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}
在这个例子中,ProducerConsumerQueue
类使用了Lock
和Condition
来实现一个线程安全的生产者-消费者队列,当生产者尝试向已满的队列中添加元素时,它会通过调用notFull.await()
进入等待状态,同样地,当消费者尝试从空队列中移除元素时,它会通过调用notEmpty.await()
进入等待状态。
当生产者成功地向队列中添加了一个元素后,它会通过调用notEmpty.signal()
唤醒一个等待的消费者(如果有的话),类似地,当消费者成功地从队列中移除了一个元素后,它会通过调用notFull.signal()
唤醒一个等待的生产者。
注意:如果想唤醒所有等待的线程,可以将signal()
调用替换为signalAll()
,例如,notEmpty.signalAll()
将唤醒所有等待在notEmpty
条件上的消费者线程,而notFull.signalAll()
将唤醒所有等待在notFull
条件上的生产者线程,在实际应用中,使用signal()
通常更高效,因为它只唤醒一个线程,而signalAll()
可能会唤醒不必要的线程,导致额外的上下文切换开销。
Condition接口和Object类中监视器方法有什么区别?
Condition
接口和Object
类中的监视器方法(wait()
, notify()
, notifyAll()
)都可以实现多线程同步和通信,但它们在使用方式和灵活性上有所不同,如下:
Object
类中的wait()
, notify()
, 和 notifyAll()
方法是与每个对象内置的锁(也称为监视器锁)紧密关联的,这意味着,只有当一个线程获得了对象的锁(通过synchronized
关键字)时,它才能调用该对象上的这些方法。
wait()
: 释放当前线程持有的对象锁,并使线程进入等待状态,直到其他线程调用同一个对象的notify()
或notifyAll()
方法来唤醒它。notify()
: 唤醒等待在对象锁上的一个线程(如果有的话)。选择哪个线程是不确定的。notifyAll()
: 唤醒所有等待在对象锁上的线程。这些方法的一个限制是,它们只能与Object
的内置锁一起使用,而且你不能为同一个锁创建多个等待集合(也就是等待不同条件的线程集合)。
相比之下,Condition
接口提供了一种更加灵活和高级的线程间通信机制,它是通过Lock
接口(比如ReentrantLock
)提供的,允许为一个锁创建多个Condition
对象,每个Condition
对象都可以有自己的等待集合。
await()
: 类似于Object.wait()
,释放当前线程持有的锁,并使线程进入等待状态,直到其他线程调用signal()
或signalAll()
方法来唤醒它。signal()
: 类似于Object.notify()
,唤醒等待在Condition
上的一个线程。signalAll()
: 类似于Object.notifyAll()
,唤醒所有等待在Condition
上的线程。Condition
的一个主要优势是,可以为同一个锁创建多个Condition
对象,每个对象管理自己的等待线程集合,这意味着,可以更加精细地控制哪些线程应该在特定条件下被唤醒。
举个生活中的实际案例:想象一下有一个仓库,里面有不同类型的商品,如果使用Object
的监视器方法,仓库的门(锁)就只有一个等待区域(等待集合),所有人(线程)都挤在一起等待,当说“开门了!”(notify()
或notifyAll()
),所有人都会一拥而上,不管他们是不是在等自己需要的商品。
而如果使用Condition
,可以为每种商品都设一个等待区域,这样,当某种商品到货时,只需唤醒等待那种商品的顾客,而不是所有人。