Java内置锁:深度解析Condition接口

发布时间:2024年01月12日

Java内置锁:深度解析Condition接口 - 程序员古德

Java中Condition接口为多线程编程带来了显著的优势,它不仅能够实现线程间的精准协调,让等待和通知机制更加灵活,还能有效避免“惊群效应”带来的性能损耗。与Objectwaitnotify相比,Condition提供了更好控制粒度,允许多个等待集合独立管理,使得多线程间的交互更加高效且易于维护,此外,Conditionawait方法能够响应中断,增强了程序的健壮性。

定义

Java内置锁:深度解析Condition接口 - 程序员古德

在Java中,Condition是一个接口,它提供了线程之间的协调机制,可以将它看作是一个更加灵活、更加强大的wait()和notify()机制,通常与Lock接口(比如ReentrantLock)一起使用。它的核心作用体现在两个方面,如下:

  1. 等待/通知机制:它允许线程等待某个条件成立,或者通知其他线程某个条件已经满足,这与使用Object的wait()和notify()方法相似,但Condition提供了更高的灵活性和更多的控制
  2. 多条件协调:与每个Object只有一个内置的等待/通知机制不同,一个Lock可以对应多个Condition对象,这意味着可以为不同的等待条件创建不同的Condition,从而实现对多个等待线程集合的独立控制。

举个生活中的实际案例:假设,张三正在经营一个智能化的餐厅,顾客可以通过一个电子点餐系统来点餐,这个系统有一个显示屏,显示各种菜品的准备状态(如“准备中”、“已就绪”、“已售出”等),在这个场景中,Condition可以发挥巨大的作用:

  1. 厨师的视角:厨师负责准备食物,当一份食物订单被下单但还未准备好时,厨师需要等待食材或烹饪工具变得可用,这时,厨师可以“等待”在一个Condition上,这个Condition代表了“食材就绪”或“烹饪工具空闲”的条件。
  2. 服务员的视角:服务员负责将准备好的食物送给顾客,服务员可能在等待某个菜品变成“已就绪”状态,他们可以“等待”在另一个Condition上,这个Condition代表了“菜品就绪”的条件。
  3. 顾客的视角:顾客在下单后,可能希望知道他们的食物什么时候准备好,系统可以通过更新显示屏上的状态来通知顾客,但这背后实际上也可以通过Condition来实现,当食物准备好时,系统可以“通知”等待在“菜品就绪”Condition上的服务员。
  4. 多条件协调:重要的是,这个系统可以同时处理多个顾客的订单,每个订单可能有不同的准备时间和要求,通过使用多个Condition,系统可以独立地管理每个订单的状态,确保正确的厨师和服务员在正确的时间被通知。

在这个例子中,Condition提供了一种有效的方式来协调不同角色(厨师、服务员、顾客)之间的交互,确保每个人都在正确的时间得到他们需要的信息或资源。这比简单地使用wait()notify()要强大得多,因为它允许更细粒度的控制和更复杂的交互模式。

核心案例

Java内置锁:深度解析Condition接口 - 程序员古德

Condition接口常常与Lock接口一起使用,提供了一种更加灵活的线程同步机制,相比于使用Objectwait()notify()notifyAll()方法,Condition允许多个等待队列(即可以有多个Condition对象),并且提供了更高的控制精度。

下面是一个使用Condition的示例场景:生产者-消费者问题。在这个场景中,有一个共享的数据缓冲区,生产者向缓冲区中放置数据,而消费者从缓冲区中取出数据,当缓冲区满时,生产者需要等待;当缓冲区空时,消费者需要等待,使用Condition可以很容易地实现这一点,如下代码案例:

import java.util.LinkedList;  
import java.util.Queue;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class ProducerConsumerExample {  
  
    // 共享的缓冲区  
    private final Queue<Integer> buffer = new LinkedList<>();  
    // 缓冲区最大容量  
    private static final int MAX_SIZE = 5;  
    // 锁对象  
    private final Lock lock = new ReentrantLock();  
    // 生产者条件  
    private final Condition producerCondition = lock.newCondition();  
    // 消费者条件  
    private final Condition consumerCondition = lock.newCondition();  
  
    // 生产者方法  
    public void produce() throws InterruptedException {  
        lock.lock();  
        try {  
            while (buffer.size() == MAX_SIZE) {  
                // 缓冲区满,生产者等待  
                producerCondition.await();  
            }  
            // 生产一个数据  
            int data = (int) (Math.random() * 100);  
            buffer.offer(data);  
            System.out.println("Produced: " + data);  
            // 通知消费者  
            consumerCondition.signalAll();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // 消费者方法  
    public void consume() throws InterruptedException {  
        lock.lock();  
        try {  
            while (buffer.isEmpty()) {  
                // 缓冲区空,消费者等待  
                consumerCondition.await();  
            }  
            // 消费一个数据  
            int data = buffer.poll();  
            System.out.println("Consumed: " + data);  
            // 通知生产者  
            producerCondition.signalAll();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // 客户端调用示例  
    public static void main(String[] args) {  
        ProducerConsumerExample pc = new ProducerConsumerExample();  
  
        // 创建生产者线程  
        Thread producerThread = new Thread(() -> {  
            try {  
                for (int i = 0; i < 10; i++) {  
                    pc.produce();  
                    Thread.sleep(100); // 模拟生产耗时  
                }  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        });  
  
        // 创建消费者线程  
        Thread consumerThread = new Thread(() -> {  
            try {  
                for (int i = 0; i < 10; i++) {  
                    pc.consume();  
                    Thread.sleep(150); // 模拟消费耗时  
                }  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        });  
  
        // 启动线程  
        producerThread.start();  
        consumerThread.start();  
    }  
}

在上面代码中,定义了ProducerConsumerExample类,它包含了共享缓冲区、锁和条件变量,生产者线程调用produce()方法,而消费者线程调用consume()方法,当缓冲区满时,生产者通过调用producerCondition.await()等待;当缓冲区空时,消费者通过调用consumerCondition.await()等待,当条件满足时(即缓冲区不满或不为空),相应的线程会被唤醒,并通过调用signalAll()方法通知其他等待的线程。

核心方法

Java内置锁:深度解析Condition接口 - 程序员古德

详见官方文档:https://docx.iamqiang.com/jdk11/api/java.base/java/util/concurrent/locks/Condition.html

Modifier and TypeMethodDescription
voidawait()使当前线程等待,直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断,或者发生了其他不可预知的情况(如假唤醒)。该方法会在等待之前释放当前线程所持有的锁,在被唤醒后会再次尝试获取锁。
booleanawait(long time, TimeUnit unit)使当前线程等待指定的时间,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断。如果在指定的时间内没有被唤醒,该方法将返回。在等待之前会释放当前线程所持有的锁,在被唤醒或超时后会再次尝试获取锁。
longawaitNanos(long nanosTimeout)使当前线程等待指定的纳秒数,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒。如果在指定的时间内没有被唤醒,该方法将返回。
voidawaitUninterruptibly()与 await() 类似,但不会响应线程的中断。即使线程在等待期间被中断,它也会继续等待,直到被 signal() 或 signalAll() 方法唤醒。
booleanawaitUntil(Date deadline)使当前线程等待,直到达到指定的截止日期,或者直到被其他线程通过 signal() 或 signalAll() 方法唤醒,或者线程被中断。
voidsignal()唤醒等待在此 Condition 上的一个线程。如果有多个线程正在等待,则选择其中的一个进行唤醒。被唤醒的线程将从其 await() 调用中返回,并重新尝试获取与此 Condition 关联的锁。
voidsignalAll()唤醒等待在此 Condition 上的所有线程。每个被唤醒的线程都将从其 await() 调用中返回,并重新尝试获取与此 Condition 关联的锁。

await()

下面是一个使用Condition的await()方法的示例场景:有界缓冲区。在这个场景中,有一个固定大小的缓冲区,生产者向其中添加元素,而消费者从中移除元素,如果缓冲区已满,生产者需要等待;如果缓冲区为空,消费者需要等待,如下代码:

import java.util.LinkedList;  
import java.util.Queue;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class BoundedBuffer {  
    private final Queue<Integer> buffer;  
    private final int maxSize;  
    private final Lock lock;  
    // Condition for producers  
    private final Condition notFull;  
    // Condition for consumers  
    private final Condition notEmpty;  
  
    public BoundedBuffer(int maxSize) {  
        this.maxSize = maxSize;  
        this.buffer = new LinkedList<>();  
        this.lock = new ReentrantLock();  
        this.notFull = lock.newCondition();  
        this.notEmpty = lock.newCondition();  
    }  
  
    // Produce an item  
    public void produce(int item) throws InterruptedException {  
        lock.lock();  
        try {  
            while (buffer.size() == maxSize) {  
                // Buffer is full, wait for space  
                notFull.await();  
            }  
            buffer.offer(item);  
            // Signal a waiting consumer  
            notEmpty.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // Consume an item  
    public Integer consume() throws InterruptedException {  
        lock.lock();  
        try {  
            while (buffer.isEmpty()) {  
                // Buffer is empty, wait for items  
                notEmpty.await();  
            }  
            // Signal a waiting producer  
            notFull.signal();  
            return buffer.poll();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // Client code to demonstrate usage  
    public static void main(String[] args) {  
        BoundedBuffer buffer = new BoundedBuffer(5);  
  
        // Producer thread  
        Thread producer = new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    buffer.produce(i);  
                    System.out.println("Produced: " + i);  
                    Thread.sleep(100); // Simulate work  
                } catch (InterruptedException e) {  
                    Thread.currentThread().interrupt();  
                }  
            }  
        });  
  
        // Consumer thread  
        Thread consumer = new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    Integer item = buffer.consume();  
                    System.out.println("Consumed: " + item);  
                    Thread.sleep(200); // Simulate work  
                } catch (InterruptedException e) {  
                    Thread.currentThread().interrupt();  
                }  
            }  
        });  
  
        producer.start();  
        consumer.start();  
    }  
}

在上面代码中,BoundedBuffer类使用了LockCondition来实现一个线程安全的有界缓冲区,produce()方法在缓冲区满时等待,而consume()方法在缓冲区空时等待,当生产者生产了一个元素后,它会通知等待的消费者;同样地,当消费者消费了一个元素后,它会通知等待的生产者。

signal()和signalall()的使用场景

Condition接口提供了signal()和signalAll()方法,用于唤醒等待在Condition对象上的线程,这两个方法通常与await()方法一起使用,以实现线程间的协调。

  • signal():唤醒一个在此Condition上等待的线程(如果有的话)。
  • signalAll():唤醒所有在此Condition上等待的线程。

以下是Condition的signal()和signalAll()方法的使用场景示例:生产者-消费者问题。在这个场景中,生产者生产数据并放入缓冲区,消费者从缓冲区中取出数据。当缓冲区为空时,消费者需要等待;当缓冲区满时,生产者需要等待,如下代码:

import java.util.LinkedList;  
import java.util.Queue;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class ProducerConsumerQueue {  
    private final Queue<Integer> queue;  
    private final int maxSize;  
    private final Lock lock;  
    // Condition for producers  
    private final Condition notFull;  
    // Condition for consumers  
    private final Condition notEmpty;  
  
    public ProducerConsumerQueue(int maxSize) {  
        this.maxSize = maxSize;  
        this.queue = new LinkedList<>();  
        this.lock = new ReentrantLock();  
        this.notFull = lock.newCondition();  
        this.notEmpty = lock.newCondition();  
    }  
  
    public void produce(int item) throws InterruptedException {  
        lock.lock();  
        try {  
            while (queue.size() == maxSize) {  
                // Buffer is full, wait for space  
                notFull.await();  
            }  
            queue.add(item);  
            // Signal a waiting consumer (or all with signalAll())  
            notEmpty.signal(); // Change to notEmpty.signalAll() to wake up all consumers  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public Integer consume() throws InterruptedException {  
        lock.lock();  
        try {  
            while (queue.isEmpty()) {  
                // Buffer is empty, wait for items  
                notEmpty.await();  
            }  
            // Remove an item from the buffer  
            Integer item = queue.remove();  
            // Signal a waiting producer (or all with signalAll())  
            notFull.signal(); // Change to notFull.signalAll() to wake up all producers  
            return item;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    // Client code to demonstrate usage  
    public static void main(String[] args) {  
        ProducerConsumerQueue queue = new ProducerConsumerQueue(5);  
  
        // Producer thread  
        Thread producer = new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    queue.produce(i);  
                    System.out.println("Produced: " + i);  
                    Thread.sleep(100); // Simulate work  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
  
        // Consumer thread  
        Thread consumer = new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    Integer item = queue.consume();  
                    System.out.println("Consumed: " + item);  
                    Thread.sleep(200); // Simulate work  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
  
        producer.start();  
        consumer.start();  
    }  
}

在这个例子中,ProducerConsumerQueue类使用了LockCondition来实现一个线程安全的生产者-消费者队列,当生产者尝试向已满的队列中添加元素时,它会通过调用notFull.await()进入等待状态,同样地,当消费者尝试从空队列中移除元素时,它会通过调用notEmpty.await()进入等待状态。

当生产者成功地向队列中添加了一个元素后,它会通过调用notEmpty.signal()唤醒一个等待的消费者(如果有的话),类似地,当消费者成功地从队列中移除了一个元素后,它会通过调用notFull.signal()唤醒一个等待的生产者。

注意:如果想唤醒所有等待的线程,可以将signal()调用替换为signalAll(),例如,notEmpty.signalAll()将唤醒所有等待在notEmpty条件上的消费者线程,而notFull.signalAll()将唤醒所有等待在notFull条件上的生产者线程,在实际应用中,使用signal()通常更高效,因为它只唤醒一个线程,而signalAll()可能会唤醒不必要的线程,导致额外的上下文切换开销。


个人思考

Condition接口和Object类中监视器方法有什么区别?

Condition接口和Object类中的监视器方法(wait(), notify(), notifyAll())都可以实现多线程同步和通信,但它们在使用方式和灵活性上有所不同,如下:

Object类的监视器方法

Object类中的wait(), notify(), 和 notifyAll() 方法是与每个对象内置的锁(也称为监视器锁)紧密关联的,这意味着,只有当一个线程获得了对象的锁(通过synchronized关键字)时,它才能调用该对象上的这些方法。

  1. wait(): 释放当前线程持有的对象锁,并使线程进入等待状态,直到其他线程调用同一个对象的notify()notifyAll()方法来唤醒它。
  2. notify(): 唤醒等待在对象锁上的一个线程(如果有的话)。选择哪个线程是不确定的。
  3. notifyAll(): 唤醒所有等待在对象锁上的线程。

这些方法的一个限制是,它们只能与Object的内置锁一起使用,而且你不能为同一个锁创建多个等待集合(也就是等待不同条件的线程集合)。

Condition接口

相比之下,Condition接口提供了一种更加灵活和高级的线程间通信机制,它是通过Lock接口(比如ReentrantLock)提供的,允许为一个锁创建多个Condition对象,每个Condition对象都可以有自己的等待集合。

  1. await(): 类似于Object.wait(),释放当前线程持有的锁,并使线程进入等待状态,直到其他线程调用signal()signalAll()方法来唤醒它。
  2. signal(): 类似于Object.notify(),唤醒等待在Condition上的一个线程。
  3. signalAll(): 类似于Object.notifyAll(),唤醒所有等待在Condition上的线程。

Condition的一个主要优势是,可以为同一个锁创建多个Condition对象,每个对象管理自己的等待线程集合,这意味着,可以更加精细地控制哪些线程应该在特定条件下被唤醒。

举个生活中的实际案例:想象一下有一个仓库,里面有不同类型的商品,如果使用Object的监视器方法,仓库的门(锁)就只有一个等待区域(等待集合),所有人(线程)都挤在一起等待,当说“开门了!”(notify()notifyAll()),所有人都会一拥而上,不管他们是不是在等自己需要的商品。

而如果使用Condition,可以为每种商品都设一个等待区域,这样,当某种商品到货时,只需唤醒等待那种商品的顾客,而不是所有人。

关注我,每天学习互联网编程技术 - 程序员古德

文章来源:https://blog.csdn.net/qusikao/article/details/135538048
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。