Java中的BlockingQueue接口
Java中的 BlockingQueue接口 是在Java 1.5中添加的,同时还添加了各种其他并发实用类,如ConcurrentHashMap、 计数信号量 、CopyOnWriteArrayList等。BlockingQueue接口通过在BlockingQueue满或为空时引入阻塞来支持流量控制(除队列外)。试图将元素入队到满队列中的线程被阻塞,直到某些其他线程将队列中的元素出队或清空队列,为其腾出空间。同样地,它也会阻塞试图从空队列中删除的线程,直到某些其他线程插入一个项。BlockingQueue不接受null值。如果试图入队null item,则会抛出 NullPointerException 。
Java提供了几种BlockingQueue实现,例如LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、 SynchronousQueue 等。Java BlockingQueue接口实现是线程安全的。BlockingQueue的所有方法在本质上都是原子的,并且使用内部锁或其他形式的并发控制。Java 5在 java.util.concurrent包 中提供了BlockingQueue实现。
BlockingQueue的用法

使用生产者(put)线程和消费者(take)线程访问的BlockingQueue。
BlockingQueue的层次结构

声明
public interface BlockingQueue<E> extends Queue<E>
这里, E 是存储在集合中的元素类型。
实现BlockingQueue的类
由于BlockingQueue是一个接口,我们不能直接提供一个BlockingQueue实例,因此要利用实现它的类的功能。此外,要在您的代码中使用BlockingQueue,请使用以下导入语句。
import java.util.concurrent.BlockingQueue;
(或)
import java.util.concurrent.*;
- LinkedBlockingQueue
- ArrayBlockingQueue
BlockingDeque的实现类是LinkedBlockingDeque。该类是BlockingDeque和链表数据结构的实现。使用构造函数可以选择性地限制LinkedBlockingDeque的容量,但是如果未指定容量,则默认为Integer.MAX_VALUE。在插入时会动态添加节点以遵守容量约束。
创建对象的语法
BlockingQueue<?> objectName = new LinkedBlockingDeque<?>();
(或)
LinkedBlockingDeque<?> objectName = new LinkedBlockingDeque<?>();
例如:在下面的代码中,我们对LinkedBlockingDeque执行一些基本操作,例如创建对象、添加元素、删除元素,并使用迭代器遍历LinkedBlockingDeque。
BlockingQueue类型
BlockingQueue有 两种类型 :
1. 无界队列: Blocking队列的Capacity将设置为Integer.MAX_VALUE。在无界阻塞队列的情况下,队列永远不会阻塞,因为它可以增长到非常大的大小。当添加元素时,其大小会增长。
语法:
BlockingQueue blockingQueue = new LinkedBlockingDeque();
2. 有界队列: 第二种队列是有界队列。在有界队列的情况下,您可以通过传递队列容量来创建队列和队列构造函数:
语法:
// 创建一个容量为5的阻塞队列
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);
使用 BlockingQueue 实现有界 Semaphore
// Java 程序,解释了 BlockingQueue 的内部实现
import java.io.*;
import java.util.*;
class BlockingQueue<E> {
// 使用 LinkedList 结构的 BlockingQueue
// 具有容量约束
private List<E> queue = new LinkedList<E>();
// 限制变量定义容量
private int limit = 10;
// BlockingQueue 的构造函数
public BlockingQueue(int limit) {this.limit = limit; }
// 入队方法,当您尝试插入到限制之后时,会抛出异常
public synchronized void enqueue(E item)
throws InterruptedException
{
while (this.queue.size() == this.limit) {
wait();
}
if (this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
// 出队方法,从空队列中删除元素时会抛出异常
public synchronized E dequeue()
throws InterruptedException
{
while (this.queue.size() == 0) {
wait();
}
if (this.queue.size() == this.limit) {
notifyAll();
}
return this.queue.remove(0);
}
public static void main(String []args)
{
}
}
示例:
// Java 程序演示了如何使用 BlockingQueue
//
import java.util.concurrent.*;
import java.util.*;
public class GFG {
public static void main(String[] args)
throws InterruptedException
{
// 定义 ArrayBlockingQueue 的容量
int capacity = 5;
// 创建 ArrayBlockingQueue 的对象
BlockingQueue<String> queue
= new ArrayBlockingQueue<String>(capacity);
// 使用 put 方法将元素添加到 ArrayBlockingQueue 中
queue.put("StarWars");
queue.put("SuperMan");
queue.put("Flash");
queue.put("BatMan");
queue.put("Avengers");
// 打印队列
System.out.println("queue 包含 " + queue);
// 删除一些元素
queue.remove();
queue.remove();
// 使用 put 方法将元素添加到 ArrayBlockingQueue 中
queue.put("CaptainAmerica");
queue.put("Thor");
System.out.println("queue 包含 " + queue);
}
}
输出:
queue 包含 [StarWars, SuperMan, Flash, BatMan, Avengers]
queue 包含 [Flash, BatMan, Avengers, CaptainAmerica, Thor]
基本操作
1. 添加元素
可以通过不同的方式将元素添加到 LinkedBlockedDeque 中,具体取决于我们要将其用作的结构类型。最常见的方法是使用 add() 方法,可以将元素添加到双端队列的末尾。我们也可以使用 addAll() 方法(它是 Collection 接口的一个方法)将整个集合添加到 LinkedBlockingDeque 中。如果我们希望将 deque 用作队列,则可以使用 add() 和 put()。
//Java程序示范BlockingQueue的add()方法
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.*;
public class GFG {
public static void main(String[] args)
throws IllegalStateException
{
//创建BlockingQueue的对象
BlockingQueue<Integer> BQ
= new LinkedBlockingDeque<Integer>();
//将数字添加到BlockingQueue中
BQ.add(7855642);
BQ.add(35658786);
BQ.add(5278367);
BQ.add(74381793);
//在移除数据之前打印BlockingQueue中的数据
System.out.println("Blocking Queue: " + BQ);
}
}
输出结果
Blocking Queue: [7855642, 35658786, 5278367, 74381793]
2. 访问元素
可以使用contains()、element()、peek()和poll()方法访问LinkedBlockingDeque的元素。表格中给出了这些方法的变体以及它们的描述。
//Java程序:访问LinkedBlockingDeque的元素
import java.util.concurrent.*;
public class AccessingElements {
public static void main(String[] args)
{
//创建LinkedBlockingDeque的对象
BlockingQueue<Integer> lbdq
= new LinkedBlockingDeque<Integer>();
//使用add()添加元素
lbdq.add(22);
lbdq.add(125);
lbdq.add(723);
lbdq.add(172);
lbdq.add(100);
//在控制台上打印lbdq的元素
System.out.println(
"The LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
//检查deque是否包含22
if (lbdq.contains(22))
System.out.println(
"The LinkedBlockingDeque, lbdq contains 22");
else
System.out.println("No such element exists");
//使用element()获取deque的头部
int head = lbdq.element();
System.out.println("The head of lbdq: " + head);
}
}
输出结果
The LinkedBlockingDeque, lbdq contains:
[22, 125, 723, 172, 100]
The LinkedBlockingDeque, lbdq contains 22
The head of lbdq: 22
3. 删除元素
可以使用remove()方法从LinkedBlockingDeque中删除元素。还可以使用take()和poll()等方法以删除第一个和最后一个元素的方式。
// 从LinkedBlockingDeque中移除元素的Java程序
import java.util.concurrent.*;
public class RemovingElements {
public static void main(String[] args)
{
// 实例化一个名为lbdq的LinkedBlockingDeque对象
BlockingQueue<Integer> lbdq
= new LinkedBlockingDeque<Integer>();
// 使用add()添加元素
lbdq.add(75);
lbdq.add(86);
lbdq.add(13);
lbdq.add(44);
lbdq.add(10);
// 在控制台上打印lbdq的元素
System.out.println(
"The LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
// 使用remove()删除元素
lbdq.remove(86);
lbdq.remove(44);
// 尝试删除LinkedBlockingDeque中不存在的元素
lbdq.remove(1);
// 在控制台上打印lbdq的元素
System.out.println(
"\nThe LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
}
}
输出
The LinkedBlockingDeque, lbdq contains:
[75, 86, 13, 44, 10]
The LinkedBlockingDeque, lbdq contains:
[75, 13, 10]
4. 遍历元素
要遍历LinkedBlockingDeque的元素,我们可以创建一个迭代器并使用Java集合框架的根接口Iterable的方法来访问元素。Iterable的next()方法返回任何集合的元素。
// Java程序用于遍历LinkedBlockingDeque
import java.util.Iterator;
import java.util.concurrent.*;
public class IteratingThroughElements {
public static void main(String[] args) {
// 实例化一个名为lbdq的LinkedBlockingDeque对象
BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>();
// 使用add()添加元素
lbdq.add(166);
lbdq.add(246);
lbdq.add(66);
lbdq.add(292);
lbdq.add(98);
// 创建一个迭代器来遍历lbdq
Iterator<Integer> lbdqIter = lbdq.iterator();
// 在控制台上打印lbdq的元素
System.out.println("The LinkedBlockingDeque, lbdq contains:");
for(int i = 0; i<lbdq.size(); i++)
{
System.out.print(lbdqIter.next() + " ");
}
}
}
输出
The LinkedBlockingDeque, lbdq contains:
166 246 66 292 98
BlockingQueue的方法
| 方法 | 描述 |
|---|---|
| add(E e) | 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,并在成功时返回 true,在当前没有可用空间时抛出 IllegalStateException。 |
| contains(Object o) | 如果此队列包含指定的元素,则返回 true。 |
| drainTo(Collection super E> c) | 从此队列中删除所有可用元素并将它们添加到给定的集合中。 drainTo(Collection super E> c, int maxElements) | 从此队列中最多取出给定数量的可用元素,并将它们添加到给定的集合中。 offer(E e) | 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,并在成功时返回 true,在当前没有可用空间时返回 false。 offer(E e, long timeout, TimeUnit unit) | 如果有必要,插入指定的元素到此队列,并等待指定的等待时间以使空间变得可用。 poll(long timeout, TimeUnit unit) | 检索并移除此队列的头部,在必要时等待指定的等待时间以使一个元素变得可用。 put(E e) | 如果有必要,将指定的元素插入此队列,并等待直到有可用空间。 remainingCapacity() | 返回此队列可以理想地(在没有内存或资源限制的情况下)接受的附加元素数量,而不会阻止或 Integer.MAX_VALUE(如果没有固有限制)。 remove(Object o) | 如果存在,则从此队列中删除指定元素的单个实例。 take() | 检索并移除此队列的头部,在必要时等待直到一个元素变得可用。 ### 在java.util.Collection接口中声明的其他方法 方法 | 描述 |
如果此集合包含指定集合中的所有元素,则返回true。 |
| equals(Object o) | 将指定对象与此集合进行比较,以检查它们是否相等。 |
| hashCode() | 返回此集合的哈希码值。 |
| isEmpty() | 如果此集合不包含任何元素,则返回true。 |
| iterator() | 返回此集合中的元素上的迭代器。 |
| parallelStream() | 返回可能并行流,其源为此集合。 |
| removeAll(Collection> c) | 移除此集合中也包含在指定集合中的所有元素(可选操作)。 removeIf(Predicate super E> filter) | 删除此集合中满足给定谓词的所有元素。 retainAll(Collection> c) |
仅保留此集合中包含在指定集合中的元素(可选操作)。 |
| size() | 返回此集合中的元素数量。 |
| spliterator() | 创建Spliterator,用于此集合中的元素。 |
| stream() | 返回带有此集合作为源的顺序流。 |
| toArray() | 返回包含此集合中所有元素的数组。 |
| toArray(IntFunction<T[]> generator) | 返回包含此集合中所有元素的数组,使用提供的生成器函数来分配返回的数组。 |
| toArray(T[] a) | 返回包含此集合中所有元素的数组;返回数组的运行时类型与指定数组的类型相同。 |
在 java.lang.Iterable 中声明的方法
| 方法 | 描述 |
|---|---|
| forEach(Consumer<? super T> action) | 对Iterable的每个元素执行给定操作,直到处理完所有元素或操作抛出异常。 |
在 java.util.Queue 中声明的方法
| 方法 | 描述 |
|---|---|
| element() | 检索但不删除此队列的头部。 |
| peek() | 检索但不删除此队列的头部,如果此队列为空,则返回null。 |
| poll() | 检索并删除此队列的头部,如果此队列为空,则返回null。 |
| remove() | 检索并删除此队列的头部。 |
BlockingQueue 方法的行为
下面是队列插入、删除和检查操作提供的 BlockingQueue 方法。如果请求的操作不能立即满足,则每组方法的行为都不同。
- 抛出异常: 如果请求的操作不能立即满足,则会抛出异常。
- 特殊值: 如果操作不能立即满足,则会返回特殊值。
- 阻塞: 如果尝试的操作不能立即满足,则该方法调用会被阻塞,并等待直到它被执行。
- 超时: 返回一个特殊值,告知操作是否成功。如果请求的操作不能立即执行,则该方法调用会被阻塞,但不会等待超过给定的超时时间。
| 操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 删除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | 不适用 | 不适用 |
参考: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html
极客教程