生产者-消费者模式

发布时间:2023年12月25日

生产者-消费者模式是一个经典的多线程设计模式。
它为多线程间的协作提供了良好的解决方案。在生产者-消费模式中,通常有两类线程,即若干个生产线程
和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。
生产者和消费者之间则通过共享内存缓冲区进行通讯。

生产者-消费者模式的核心组件是共享内存缓冲区。
它作为生产者消费者的桥梁,避免了生产者和消费者直接通讯,
从而将生产者和消费者进行解耦。
生产者不需要知道消费者存在,消费者不需要知道生产者的存在。
同时,由于内存缓冲区的存在,
允许生产者和消费者在执行速度上存在时间差,
无论是生产者在某一局部时间内的速度高于消费者的速度。
还是消费者在局部时间内的速度高于生产者的速度。
都可用通过共享内存缓冲区得到缓解,确保系统正常运行。

public class Main{
    public static void main(String[] args) {
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);

        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        producer1.stop();
        producer2.stop();
        producer3.stop();
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        service.shutdown();

    }

}
public class Consumer implements Runnable{
    private BlockingQueue<PCData> queue;
    private static final int SLEEPTIME = 1000;

    public Consumer(BlockingQueue<PCData> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        System.out.println("start Consumer id="+ Thread.currentThread().getId());
        Random r = new Random();
        try{
            while (true){
                PCData data = queue.take();
                if(null != data){
                    int re = data.getData() * data.getData();
                    System.out.println(MessageFormat.format("{0}*{1}={2}",
                            data.getData(),data.getData(),re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }

        }catch (InterruptedException e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}
public class PCData {
    private final int intData;
    public PCData(int d){
        intData = d;
    }
    public PCData(String d){
        intData = Integer.valueOf(d);
    }
    public int getData(){
        return intData;
    }
    @Override
    public String toString(){
        return "data:"+intData;
    }
}
public class Producer implements Runnable{
    private volatile boolean isRunning = true;
    private BlockingQueue<PCData> queue;
    private static AtomicInteger count = new AtomicInteger();
    private static final int SLEEPTIME = 1000;
    public Producer(BlockingQueue<PCData> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        PCData data = null;
        Random r = new Random();
        System.out.println("start producer id="+Thread.currentThread().getId());
        try {
            while(isRunning){
                Thread.sleep(r.nextInt(SLEEPTIME));
                data = new PCData(count.incrementAndGet());
                System.out.println(data+" is put into queue");
                if(!queue.offer(data,2, TimeUnit.SECONDS)) {
                    System.err.println("failed to put data: "+ data);
                }
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop(){
        isRunning = false;
    }

}

输出平方结果。

data:46 is put into queue
data:47 is put into queue
44*44=1,936
45*45=2,025
data:48 is put into queue
data:49 is put into queue
data:50 is put into queue

文章来源:https://blog.csdn.net/wcg_jishuo/article/details/135187794
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。