目录
阻塞队列是?种特殊的队列. 也遵守 "先进先出" 的原则
阻塞队列能是?种线程安全的数据结构, 并且具有以下特性:
当队列满的时候, 继续?队列就会阻塞, 直到有其他线程从队列中取?元素.
?当队列空的时候, 继续出队列也会阻塞, 直到有其他线程往队列中插?元素.
阻塞队列的?个典型应?场景就是 "?产者消费者模型". 这是?种?常典型的开发模型.
?产者消费者模式就是通过?个容器来解决?产者和消费者的强耦合问题。 ?产者和消费者彼此之间不直接通讯,?通过阻塞队列来进?通讯,所以?产者?产完数据之后不? 等待消费者处理,直接扔给阻塞队列,消费者不找?产者要数据,?是直接从阻塞队列?取.
实际开发中, 经常会涉及到 "分布式系统", 服务器整个功能不是由一个服务器全部完成的. 而是每个服务器负责一部分功能, 通过服务器之间的网络通信, 最终完成整个功能.
上述过程中, A 和 B , A 和 C 之间的耦合性是比较轻的, A中的代码需要设计到一些B相关的操作, B中的代码也涉及到和A相关的操作,? ?A 的代码中也需要涉及和 C 相关的操作, C 的代码也涉及和 A 相关的操作, 另外, 如果 B 或者 C 服务器出现故障, 对 A 的影响就很大.
引用生产者消费者模型, 就可以降低上述的耦合
这样 A B C 之间就不是直接交互了, 而是通过队列在中间经常传递.
使用实例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadDemo28 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("hello");
System.out.println(queue.take());
System.out.println(queue.take());
}
}
当队列中的元素为空时, 在执行 take() 方法, 进程就会进入阻塞状态:
此时进程并会不结束.
基于环形队列来简单实现阻塞队列:
import static java.lang.Thread.sleep;
class MyBlockingQueue {
private String[] elems = null;
private int head = 0;
private int tail = 0;
private int size = 0;
Object locker = new Object();
public MyBlockingQueue(int capacity) {
elems = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (locker) {
while(size >= elem.length()) {
//队列满了就阻塞
locker.wait();
}
elems[tail] = elem;
tail++;
if(tail >= elems.length) {
tail = 0;
}
size++;
// 入队列成功后唤醒
locker.notify();
}
}
public String take() throws InterruptedException {
String elem = null;
synchronized (locker) {
while(size == 0) {
// 队列空了
// 也需要这个代码阻塞
locker.wait();
}
elem = elems[head];
head++;
if(head >= elems.length) {
head = 0;
}
size--;
//元素出队列成功后, 唤醒
locker.notify();
}
return elem;
}
}
细节注意:
基于阻塞队列实现简单的生产者消费者模型:
public class ThreadDeom29 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(1000);
//生产者
Thread t1 = new Thread(()->{
int n = 1;
while(true) {
try {
queue.put(n + "");
System.out.println("生产元素 " + n);
sleep(1000);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者
Thread t2 = new Thread(()-> {
while(true) {
try {
String n = queue.take();
System.out.println("消费元素 " + n);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
t1线程每隔一秒钟存放一个元素到阻塞队列中, t2 线程则在队列中有元素时将元素取出, 队列没有元素就进入阻塞状态, 直到队列不再为空?
?