? ? ? ?📝个人主页:五敷有你? ? ??
?🔥系列专栏:并发编程
??稳中求进,晒太阳
要点:
思想就是封装消息类,然后用LinkList来充当队列表,设置一个上限
当满的时候,生产者queue.wait()阻塞,然后等待消费者queue.notifyAll()唤醒
当空的时候,消费者queue.wait()阻塞,然后等待生产者queue.notifyAll()唤醒
封装消息类
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
//建立消息队列
class MessageQueue {
private LinkedList<Message> queue;
//设置容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("库存已达上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
MessageQueue messageQueue=new MessageQueue(2);
// 4 个生产者线程, 下载任务
for(int i=0;i< 4;i++){
int id=i;
new Thread(()->{
try{log.debug("download...");
List<String> response=Downloader.download();
log.debug("try put message({})",id);
messageQueue.put(new Message(id,response));
}catch(IOException e){
e.printStackTrace();
}
},"生产者"+i).start();
}
// 1 个消费者线程, 处理结果
new Thread(()->{
while(true){
Message message=messageQueue.take();
List<String> response=(List<String>)message.getMessage();
log.debug("take message({}): [{}] lines",message.getId(),response.size());
}
},"消费者").start();
某次运行结果