在 Java 中容器主要有 2 个大类 Collection 和 Map, 其中 Collection 主用用于数据的直接存储 (Map 则是一种键值对的存储方式, 除了要存储的数据外, 还需要有一个 key 和数据建立一个映射关系)。
Collection 使用的最多的大概就是 List, Set 和 Queue。
而在日常的开发中, 会根据不同数据的特性, 比如是否可重复, 是否需要有序等条件选择不同的集合, 同时还会考虑其他的因素, 比如线程是否安全。
而本文的主要是对基于 Queue 派生的另一个线程安全的接口 – BlockingQueue 做个简单的介绍。
在经典的“生产者-消费者”问题中, 队列通常被视为线程间操作的数据容器。这样一来, 我们能够将各个模块的业务功能解耦。生产者负责将其“生产”出来的数据放置在数据容器中, 而消费者则仅需在“数据容器”中获取数据。这种设计使得生产者线程和消费者线程能够实现解耦, 各自专注于自身的业务功能
而本篇主要对基于 Queue 派生的另一个线程安全的接口 BlockingQueue 做一个简单的介绍。
阻塞队列 (BlockingQueue) 被广泛使用在 “生产者-消费者” 问题中, 作为线程间操作的数据容器, 使得各个模块的业务功能进行解耦。生产者专注生产数据, 然后投递到队列中即可, 而消费者则从队列中获取数据, 然后专注消费数据。
BlockingQueue 能够高频的应用于 “生产者-消费者” 其原因是它提供了可阻塞的插入和移除的方法。
当队列容器已满, 生产者线程会被阻塞, 直到队列未满;当队列容器为空时, 消费者线程会被阻塞, 直至队列非空时为止。
几种不同的场景的新增删除和检查方法列表
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
remove | remove() | poll() | take() | poll(time, unit) |
examine | element() | peek() | not applicable | not applicable |
新增数据
- add(E e): 往队列插入数据, 当队列满时, 插入元素时会抛出 IllegalStateException 异常
- offer(E e): 当往队列插入数据时, 插入成功返回 true, 否则返回 false, 当队列满时不会抛出异常
- put(E e): 当阻塞队列容量已经满时, 往阻塞队列插入数据的线程会被阻塞, 直至阻塞队列已经有空余的容量可供使用
- offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时, 同样会阻塞插入数据的线程, 直至阻塞队列已经有空余的地方, 与 put 方法不同的是, 该方法会有一个超时时间, 若超过当前给定的超时时间, 插入数据的线程会退出
删除元素
- remove(Object o):从队列中删除数据, 成功则返回 true, 否则为 false
- poll(): 删除队列中第一个的数据, 当队列为空时, 返回 null
- take():当阻塞队列为空时, 获取队头数据的线程会被阻塞
- poll(long timeout, TimeUnit unit): 当阻塞队列为空时, 获取数据的线程会被阻塞, 另外, 如果被阻塞的线程超过了给定的时长, 该线程会退出
查看元素
- element(): 获取队头元素, 如果队列为空时则抛出 NoSuchElementException 异常
- peek():获取队头元素, 如果队列为空, 返回 null
实现 BlockingQueue 接口的有
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
- LinkedTransferQueue
- DelayQueue
- LinkedBlockingDeque
而这几种常见的阻塞队列也是在实际编程中会常用的, 下面对这几种常见的阻塞队列进行说明。
特点
- ArrayBlockingQueue 一旦创建, 容量不能改变
- ArrayBlockingQueue 是由数组实现的有界阻塞队列, 该队列里面的元素满足 FIFO (先进先出), 因此, 头元素在队列中存在时间最长, 而队尾数据则是当前队列最新的数据元素。
- ArrayBlockingQueue 可作为 “有界数据缓冲区”, 生产者插入数据到队列容器中, 并由消费者提取。
- 当队列容量满时, 尝试将元素放入队列将导致操作阻塞, 同理尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性。
所谓公平性是指严格按照线程等待的绝对时间顺序, 即最先等待的线程能够最先访问到 ArrayBlockingQueue 中的数据。
而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序。有可能存在, ArrayBlockingQueue 从无数据变为有数据, 即可被访问时, 长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。
如果需要保证公平性, 可以在声明 ArrayBlockingQueue 的时候指明公平锁, 但是这通常会降低吞吐量。
ArrayBlockingQueue 的声明:
// true 公平竞争 false 非公平竞争
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(容量大小,true);
LinkedBlockingQueue 是用链表实现的有界阻塞队列, 同样满足 FIFO 的特性, 与 ArrayBlockingQueue 相比起来具有更高的吞吐量, 为了防止 LinkedBlockingQueue 容量迅速增, 损耗大量内存。
通常在创建 LinkedBlockingQueue 对象时, 会指定其大小, 如果未指定, 容量等于 Integer.MAX_VALUE。
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。
默认情况下元素采用自然顺序进行排序, 也可以通过自定义类实现 compareTo() 方法来指定元素排序规则, 或者初始化时通过构造器参数 Comparator 来指定排序规则。
SynchronousQueue 每个插入操作必须等待另一个线程进行相应的删除操作, 因此, SynchronousQueue 实际上没有存储任何数据元素, 因为只有线程在删除数据时, 其他线程才能插入数据。
同样的, 如果当前有线程在插入数据时, 线程才能删除数据。SynchronousQueue 也可以通过构造器参数来为其指定公平性。
LinkedTransferQueue 是一个由链表数构成的无界阻塞队列, 由于该队列实现了 TransferQueue 接口, 与其他阻塞队列相比主要有以下不同的方法
transfer(E e): 如果当前有线程 (消费者) 正在调用 take() 方法或者可延时的 poll() 方法进行消费数据时, 生产者线程可以调用这个方法将数据传递给消费者线程。
如果当前没有消费者线程的话, 生产者线程就会将数据插入到队尾, 直到有消费者能够进行消费才能退出
tryTransfer(E e): 如果当前有线程 (消费者) 正在调用 take() 方法或者可延时的 poll() 方法进行消费数据时, 生产者线程可以调用这个方法将数据立即传送给消费者线程, 如果当前没有消费者线程消费数据的话, 就立即返回 false。
因此, 与 transfer 方法相比, transfer 方法是必须等到有消费者线程消费数据时, 生产者线程才能够返回。而 tryTransfer 方法能够立即返回结果退出。
tryTransfer(E e,long timeout,imeUnit unit): 与 transfer 基本功能一样, 只是增加了超时特性, 如果数据到了规定的超时时间内没有消费者进行消费的话, 就返回 false。
DelayQueue 是一个存放实现 Delayed 接口的数据的无界阻塞队列, 只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,
则队列没有队头, 并且线程通过 poll() 等方法获取数据元素则返回 null。
所谓数据延时期满时, 则是通过 Delayed 接口的 long getDelay(TimeUnit unit) 进行判定,
如果该方法返回的是小于等于 0 则说明该数据元素的延时期已满。
LinkedBlockingDeque 是基于链表实现的有界阻塞双端队列, 如果在创建对象时为指定大小时, 其默认大小为 Integer.MAX_VALUE。
与 LinkedBlockingQueue 相比, 主要的不同点在于, LinkedBlockingDeque 具有双端队列的特性。 所以其提供的增删操作的方法有些不同。
头部操作
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
insert | addFirst(e) | offerFirst(e) | putFirst(e) | offerFirst(e, time, unit) |
remove | removeFirst() | pollFirst() | takeFirst() | pollFirst(time, unit) |
examine | getFirst() | peekFirst() | not applicable | not applicable |
尾部操作
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
insert | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
remove | removeLast() | pollLast() | takeLast() | pollLast(time, unit) |
examine | getLast() | peekLast() | not applicable | not applicable |
LinkedBlockingDeque 的基本操作可以分为四种类型:
- 特殊情况, 抛出异常
- 特殊情况, 返回特殊值如 null 或者 false
- 当线程不满足操作条件时, 线程会被阻塞直至条件满足
- 操作具有超时特性
另外, LinkedBlockingDeque 实现了 BlockingDueue 接口, 而 LinkedBlockingQueue 实现的是 BlockingQueue, 这两个接口的主要区别如下
Insert
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
BlockingDeque | addLast(e) | offerLast(e) | putLast(e) | offerLast(e, time, unit) |
BlockingQueue | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
BlockingDeque | removeFist() | pollFirst() | takeFirst() | pollFirst(time, unit) |
BlockingQueue | remove() | poll() | take(e) | poll(e, time, unit) |
Examine
Throws Excepiton | Special Value | Blocks | Times Out | |
---|---|---|---|---|
BlockingDeque | getFirst() | peeKFirst() | not applicable | not applicable |
BlockingQueue | element() | peek() | not applicable | not applicable |
从上可以看出, 两个接口的很多功能是可以等价使用的, 比如 BlockingQueue 的 add 方法和 BlockingDeque 的 addLast 方法的功能是一样的。