【JavaEE】多线程(5) -- 阻塞队列

发布时间:2023年12月17日

目录

1.阻塞队列是什么?

2.生产者消费者模型

3.标准库中的阻塞队列

4.阻塞队列的实现


1.阻塞队列是什么?

阻塞队列是?种特殊的队列. 也遵守 "先进先出" 的原则

阻塞队列能是?种线程安全的数据结构, 并且具有以下特性:

当队列满的时候, 继续?队列就会阻塞, 直到有其他线程从队列中取?元素.

?当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插?元素.

阻塞队列的?个典型应?场景就是 "?产者消费者模型". 这是?种?常典型的开发模型.

2.生产者消费者模型

?产者消费者模式就是通过?个容器来解决?产者和消费者的强耦合问题。 ?产者和消费者彼此之间不直接通讯,?通过阻塞队列来进?通讯,所以?产者?产完数据之后不? 等待消费者处理,直接扔给阻塞队列,消费者不找?产者要数据,?是直接从阻塞队列?取.

实际开发中, 经常会涉及到 "分布式系统", 服务器整个功能不是由一个服务器全部完成的. 而是每个服务器负责一部分功能, 通过服务器之间的网络通信, 最终完成整个功能.

上述过程中, A 和 B , A 和 C 之间的耦合性是比较轻的, A中的代码需要设计到一些B相关的操作, B中的代码也涉及到和A相关的操作,? ?A 的代码中也需要涉及和 C 相关的操作, C 的代码也涉及和 A 相关的操作, 另外, 如果 B 或者 C 服务器出现故障, 对 A 的影响就很大.

引用生产者消费者模型, 就可以降低上述的耦合

这样 A B C 之间就不是直接交互了, 而是通过队列在中间经常传递.

3.标准库中的阻塞队列

使用实例:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadDemo28 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
        queue.put("hello");
        System.out.println(queue.take());
        System.out.println(queue.take());

    }
}

当队列中的元素为空时, 在执行 take() 方法, 进程就会进入阻塞状态:

此时进程并会不结束.

4.阻塞队列的实现

基于环形队列来简单实现阻塞队列:

import static java.lang.Thread.sleep;

class MyBlockingQueue {
    private String[] elems = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        elems = new String[capacity];
    }

    public void put(String elem) throws InterruptedException {
        synchronized (locker) {
            while(size >= elem.length()) {
                //队列满了就阻塞
                locker.wait();
            }

            elems[tail] = elem;
            tail++;
            if(tail >= elems.length) {
                tail = 0;
            }
            size++;

            // 入队列成功后唤醒
            locker.notify();
        }
    }

    public String take() throws InterruptedException {
        String elem = null;
        synchronized (locker) {
            while(size == 0) {
                // 队列空了
                // 也需要这个代码阻塞
                locker.wait();
            }

            elem = elems[head];
            head++;
            if(head >= elems.length) {
                head = 0;
            }
            size--;

            //元素出队列成功后, 唤醒
            locker.notify();
        }

        return elem;
    }
}

细节注意:

基于阻塞队列实现简单的生产者消费者模型:

public class ThreadDeom29 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(1000);
        //生产者
        Thread t1 = new Thread(()->{
            int n = 1;
            while(true) {
                try {
                    queue.put(n + "");
                    System.out.println("生产元素 " + n);
                    sleep(1000);
                    n++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        //消费者
        Thread t2 = new Thread(()-> {
            while(true) {
                try {
                    String n = queue.take();
                    System.out.println("消费元素 " + n);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }
}

t1线程每隔一秒钟存放一个元素到阻塞队列中, t2 线程则在队列中有元素时将元素取出, 队列没有元素就进入阻塞状态, 直到队列不再为空?

?

文章来源:https://blog.csdn.net/m0_73648729/article/details/135045764
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。