多线程——阻塞队列

发布时间:2024年01月15日

什么是阻塞队列

相比于一般的队列,有两个特点
1.线程安全
2.带有阻塞功能
1)队伍为空时,出队列就会出现阻塞,阻塞到其他线程入队列为止
2)队伍为满时,入队列就会出现阻塞,阻塞到其他线程出队列为止

常用于生产者消费者模型
在这里插入图片描述
作用:
1.解耦合
2.削峰填谷

使用阻塞队列

public class Test12 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
        queue.put("qqq");
        String elem = queue.take();
        System.out.println("elem = "+ elem);
        elem = queue.take();
        System.out.println("elem = " + elem);

    }
}

运行结果
在这里插入图片描述
不会结束运行,一直在等待。

使用put和offer一样的都是入队列,但是put是带有阻塞功能,offer没有带阻塞功能(队满了就会返回结果)
take方法用来出队列,也是带有阻塞功能

实现阻塞队列

1)实现普通队列

class MyBlockingQueue{
    private String[] elems = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
   
    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }
    public void put(String elem) throws InterruptedException {
        
            if (size > elems.length){
                //阻塞功能
            }
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length){
                tail = 0;
            }
            size++;
            
        

    }
    public String take() throws InterruptedException {
        
            if (size == 0){
                //阻塞功能
            }
            String elem = null;
            elem = elems[head];
            head++;
            if (head >= elems.length){
                head = 0;
            }
            size--;
           
            return elem;
        }
    }

}

2)加上线程安全

class MyBlockingQueue{
    private String[] elems = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    private Object locker = new Object();
    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }
    public void put(String elem) throws InterruptedException {
        synchronized(locker){
            if (size > elems.length){
                
            }
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length){
                tail = 0;
            }
            size++;
            
        }

    }
    public String take() throws InterruptedException {
        synchronized(locker){
            if (size == 0){
               
            }
            String elem = null;
            elem = elems[head];
            head++;
            if (head >= elems.length){
                head = 0;
            }
            size--;
           
            return elem;
        }
    }

}

3)加上阻塞功能

class MyBlockingQueue{
    private String[] elems = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    private Object locker = new Object();
    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }
    public void put(String elem) throws InterruptedException {
        synchronized(locker){
            while (size > elems.length){
                locker.wait();
            }
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length){
                tail = 0;
            }
            size++;
            locker.notify();
        }

    }
    public String take() throws InterruptedException {
        synchronized(locker){
            while (size == 0){
                locker.wait();
            }
            String elem = null;
            elem = elems[head];
            head++;
            if (head >= elems.length){
                head = 0;
            }
            size--;
            locker.notify();
            return elem;
        }
    }

}

代码解释:
最终代码将if改成了while,因为if只能判定一次,如果出现以下情况就会出bug(线程A,线程B都执行到了put中的wait,因为队列已满而停止运行,线程C出队列唤醒了线程A,线程A继续入队列,入队列后就会notify,导致唤醒了线程B,而此时队列已满,无法进行入队操作,就出现了bug),所以就使用while,wait之前判定一次,唤醒之后再进行一次判定,相当于多做一步确定操作

简单的生产者消费者模型

class MyBlockingQueue{
    private String[] elems = null;
    private int size = 0;
    private int head = 0;
    private int tail = 0;
    private Object locker = new Object();
    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }
    public void put(String elem) throws InterruptedException {
        synchronized(locker){
            while (size > elems.length){
                locker.wait();
            }
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length){
                tail = 0;
            }
            size++;
            locker.notify();
        }

    }
    public String take() throws InterruptedException {
        synchronized(locker){
            while (size == 0){
                locker.wait();
            }
            String elem = null;
            elem = elems[head];
            head++;
            if (head >= elems.length){
                head = 0;
            }
            size--;
            locker.notify();
            return elem;
        }
    }

}
public class Test11 {
    public static void main(String[] args) throws InterruptedException {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(100);
        Thread t1 = new Thread(()->{
            int n = 1;
            while(true){
                try {
                    myBlockingQueue.put(n + "");
                    System.out.println("生产元素 "+n);
                    n++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

            }
        });
        Thread t2 = new Thread(()->{
           while(true){
               try {
                   String n = myBlockingQueue.take();
                   System.out.println("消费元素 " + n);
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }

        });
        t1.start();
        t2.start();
    }
}

文章来源:https://blog.csdn.net/weixin_65476629/article/details/135581764
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。