【并发编程】异步模式之生产者消费者

发布时间:2024年01月22日

? ? ? ?📝个人主页:五敷有你? ? ??
?🔥系列专栏:并发编程

??稳中求进,晒太阳

定义

要点:

  • 与之前的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应。
  • 消费队列可以用来平衡生产和消费的线程资源,
  • 生产者仅负责产生结果数据,不关心数据该如何处理。而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据。
  • JDK中各种阻塞队列,采用的就是这种模式。

实现

思想就是封装消息类,然后用LinkList来充当队列表,设置一个上限

当满的时候,生产者queue.wait()阻塞,然后等待消费者queue.notifyAll()唤醒

当空的时候,消费者queue.wait()阻塞,然后等待生产者queue.notifyAll()唤醒

封装消息类
class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}

//建立消息队列
class MessageQueue {
    private LinkedList<Message> queue;
    //设置容量
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

应用

MessageQueue messageQueue=new MessageQueue(2);
// 4 个生产者线程, 下载任务
    for(int i=0;i< 4;i++){
        int id=i;
      new Thread(()->{
      try{log.debug("download...");
      List<String> response=Downloader.download();
      log.debug("try put message({})",id);
      messageQueue.put(new Message(id,response));
       }catch(IOException e){
           e.printStackTrace();
     }
  },"生产者"+i).start();
}
        // 1 个消费者线程, 处理结果
   new Thread(()->{
     while(true){
        Message message=messageQueue.take();
        List<String> response=(List<String>)message.getMessage();
       log.debug("take message({}): [{}] lines",message.getId(),response.size());
    }
 },"消费者").start();

某次运行结果

文章来源:https://blog.csdn.net/m0_62645012/article/details/135740472
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。