Java BlockingQueue接口
Java中的 BlockingQueue接口 是在Java 1.5中加入的,同时加入的还有其他各种并发实用类,如ConcurrentHashMap、 Counting Semaphore 、 CopyOnWriteArrrayLis t等。BlockingQueue接口支持流量控制(除了队列),如果BlockingQueue是满的或空的,就会引入阻塞。一个试图在一个满的队列中排队的线程会被阻塞,直到其他线程在队列中腾出空间,要么通过取消一个或多个元素的排队,要么完全清除队列。同样地,它也会阻止一个试图从一个空队列中删除的线程,直到其他线程插入一个项目。BlockingQueue不接受一个空值。如果我们试图对空项进行排队,那么它会抛出 NullPointerException
Java提供了几个BlockingQueue的实现,如LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、 SynchronousQueue 等等。Java BlockingQueue接口的实现是线程安全的。BlockingQueue的所有方法都是原子性质的,并使用内部锁或其他形式的并发控制。Java 5在 java.util.concurrent包中 提供了BlockingQueue的实现。
BlockingQueue的用法
一个由生产者(put)线程和消费者(take)线程访问的BlockingQuee
阻塞队列的层次结构
声明
public interface BlockingQueue<E> extends Queue<E>
这里, E 是存储在集合中的元素的类型。
实现BlockingQueue的类
我们不能直接提供BlockingQueue的实例,因为它是一个接口,所以为了利用BlockingQueue的功能,我们需要利用实现它的类。另外,要在你的代码中使用BlockingQueue,请使用这个导入语句。
import java.util.concurrent.BlockingQueue;
(or)
import java.util.concurrent.*;
- 链接阻塞队列(LinkedBlockingQueen
- ArrayBlockingQueue
BlockingDeque的实现类是LinkedBlockingDeque。这个类是BlockingDeque和链接列表数据结构的实现。LinkedBlockingDeque可以使用构造函数进行约束,然而,如果没有指定容量,则默认为Integer.MAX_VALUE。节点是在插入时动态添加的,遵守容量限制。
创建对象的语法
BlockingQueue<?> objectName = new LinkedBlockingDeque<?>();
(or)
LinkedBlockingDeque<?> objectName = new LinkedBlockingDeque<?>();
例子。在下面的代码中,我们对LinkedBlockingDeque进行了一些基本的操作,比如创建一个对象,添加元素,删除元素,以及使用迭代器来遍历LinkedBlockingDeque。
BlockingQueue类型
BlockingQueue有 两种类型
1.无界队列: 阻塞队列的容量将被设置为Integer.MAX_VALUE。在无界封锁队列的情况下,队列永远不会阻塞,因为它可以增长到一个非常大的尺寸,当你添加元素时,它的尺寸会增长。
语法
BlockingQueue blockingQueue = new LinkedBlockingDeque();
2.有界队列: 第二种队列类型是有界队列。在有界队列的情况下,你可以创建一个队列,在队列构造函数中传递队列的容量。
语法
// Creates a Blocking Queue with capacity 5
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);
使用BlockingQueue实现Bounded Semaphore
// Java program that explains the internal
// implementation of BlockingQueue
import java.io.*;
import java.util.*;
class BlockingQueue<E> {
// BlockingQueue using LinkedList structure
// with a constraint on capacity
private List<E> queue = new LinkedList<E>();
// limit variable to define capacity
private int limit = 10;
// constructor of BlockingQueue
public BlockingQueue(int limit) { this.limit = limit; }
// enqueue method that throws Exception
// when you try to insert after the limit
public synchronized void enqueue(E item)
throws InterruptedException
{
while (this.queue.size() == this.limit) {
wait();
}
if (this.queue.size() == 0) {
notifyAll();
}
this.queue.add(item);
}
// dequeue methods that throws Exception
// when you try to remove element from an
// empty queue
public synchronized E dequeue()
throws InterruptedException
{
while (this.queue.size() == 0) {
wait();
}
if (this.queue.size() == this.limit) {
notifyAll();
}
return this.queue.remove(0);
}
public static void main(String []args)
{
}
}
例子
// Java Program to demonstrate usage of BlockingQueue
import java.util.concurrent.*;
import java.util.*;
public class GFG {
public static void main(String[] args)
throws InterruptedException
{
// define capacity of ArrayBlockingQueue
int capacity = 5;
// create object of ArrayBlockingQueue
BlockingQueue<String> queue
= new ArrayBlockingQueue<String>(capacity);
// Add elements to ArrayBlockingQueue using put
// method
queue.put("StarWars");
queue.put("SuperMan");
queue.put("Flash");
queue.put("BatMan");
queue.put("Avengers");
// print Queue
System.out.println("queue contains " + queue);
// remove some elements
queue.remove();
queue.remove();
// Add elements to ArrayBlockingQueue
// using put method
queue.put("CaptainAmerica");
queue.put("Thor");
System.out.println("queue contains " + queue);
}
}
输出
queue contains [StarWars, SuperMan, Flash, BatMan, Avengers]
queue contains [Flash, BatMan, Avengers, CaptainAmerica, Thor]
基本操作
1.添加元素
可以通过不同的方式将元素添加到LinkedBlockedDeque中,这取决于我们试图将其用作何种结构。最常见的方法是add()方法,我们可以在deque的末端添加元素。我们也可以使用addAll()方法(这是Collection接口的一个方法)来将整个集合添加到LinkedBlockingDeque中。如果我们希望将deque作为一个队列,我们可以使用add()和put()。
// Java Program Demonstrate add()
// method of BlockingQueue
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.*;
public class GFG {
public static void main(String[] args)
throws IllegalStateException
{
// create object of BlockingQueue
BlockingQueue<Integer> BQ
= new LinkedBlockingDeque<Integer>();
// Add numbers to the BlockingQueue
BQ.add(7855642);
BQ.add(35658786);
BQ.add(5278367);
BQ.add(74381793);
// before removing print BlockingQueue
System.out.println("Blocking Queue: " + BQ);
}
}
输出
Blocking Queue: [7855642, 35658786, 5278367, 74381793]
2.访问元素
LinkedBlockingDeque的元素可以使用contains()、element()、peek()、poll()来访问。这些方法也有一些变化,在上表中给出了它们的描述。
// Java Program for Accessing the elements of a
// LinkedBlockingDeque
import java.util.concurrent.*;
public class AccessingElements {
public static void main(String[] args)
{
// Instantiate an object of LinkedBlockingDeque
// named lbdq
BlockingQueue<Integer> lbdq
= new LinkedBlockingDeque<Integer>();
// Add elements using add()
lbdq.add(22);
lbdq.add(125);
lbdq.add(723);
lbdq.add(172);
lbdq.add(100);
// Print the elements of lbdq on the console
System.out.println(
"The LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
// To check if the deque contains 22
if (lbdq.contains(22))
System.out.println(
"The LinkedBlockingDeque, lbdq contains 22");
else
System.out.println("No such element exists");
// Using element() to retrieve the head of the deque
int head = lbdq.element();
System.out.println("The head of lbdq: " + head);
}
}
输出
The LinkedBlockingDeque, lbdq contains:
[22, 125, 723, 172, 100]
The LinkedBlockingDeque, lbdq contains 22
The head of lbdq: 22
3.删除元素
元素可以通过remove()从LinkedBlockingDeque中删除。其他的方法如take()和poll()也可以被用来删除第一个和最后一个元素。
// Java Program for removing elements from a
// LinkedBlockingDeque
import java.util.concurrent.*;
public class RemovingElements {
public static void main(String[] args)
{
// Instantiate an object of LinkedBlockingDeque
// named lbdq
BlockingQueue<Integer> lbdq
= new LinkedBlockingDeque<Integer>();
// Add elements using add()
lbdq.add(75);
lbdq.add(86);
lbdq.add(13);
lbdq.add(44);
lbdq.add(10);
// Print the elements of lbdq on the console
System.out.println(
"The LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
// Remove elements using remove();
lbdq.remove(86);
lbdq.remove(44);
// Trying to remove an element
// that doesn't exist
// in the LinkedBlockingDeque
lbdq.remove(1);
// Print the elements of lbdq on the console
System.out.println(
"\nThe LinkedBlockingDeque, lbdq contains:");
System.out.println(lbdq);
}
}
输出
The LinkedBlockingDeque, lbdq contains:
[75, 86, 13, 44, 10]
The LinkedBlockingDeque, lbdq contains:
[75, 13, 10]
4.遍历元素
为了遍历LinkedBlockingDeque的元素,我们可以创建一个迭代器,并使用Iterable接口的方法,这是Java集合框架的根,来访问这些元素。Iterable的next()方法返回任何集合中的元素。
// Java Program to iterate
// through the LinkedBlockingDeque
import java.util.Iterator;
import java.util.concurrent.*;
public class IteratingThroughElements {
public static void main(String[] args) {
// Instantiate an object of LinkedBlockingDeque named lbdq
BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>();
// Add elements using add()
lbdq.add(166);
lbdq.add(246);
lbdq.add(66);
lbdq.add(292);
lbdq.add(98);
// Create an iterator to traverse lbdq
Iterator<Integer> lbdqIter = lbdq.iterator();
// Print the elements of lbdq on to the console
System.out.println("The LinkedBlockingDeque, lbdq contains:");
for(int i = 0; i<lbdq.size(); i++)
{
System.out.print(lbdqIter.next() + " ");
}
}
}
输出
The LinkedBlockingDeque, lbdq contains:
166 246 66 292 98
BlockingQueue的方法
方法 | 描述 |
---|---|
add(E e) | 如果可以在不违反容量限制的情况下立即插入指定的元素到这个队列中,成功后返回true,如果当前没有可用空间,则抛出IllegalStateException。 |
contains(Object o) | 如果这个队列包含指定的元素,返回真。 |
drainTo(Collection super E> c) | 从这个队列中移除所有可用的元素,并将它们添加到给定的集合中。 drainTo(Collection super E> c, int maxElements) | 从这个队列中最多移除给定数量的可用元素,并将它们添加到给定的集合中。 offer(E e) | 如果有可能在不违反容量限制的情况下立即插入指定的元素到这个队列中,成功后返回true,如果当前没有可用空间则返回false。 offer(E e, long timeout, TimeUnit unit) | 将指定的元素插入到这个队列中,如果需要的话,在指定的等待时间内等待空间变得可用。 poll(long timeout, TimeUnit unit) | 检索和删除这个队列的头部,如果需要的话,等待到指定的等待时间,以便有一个元素可用。 put(E e) | 将指定的元素插入到这个队列中,如果需要的话,等待空间变得可用。 remainingCapacity() | 返回这个队列在理想情况下(在没有内存或资源限制的情况下)可以接受的额外元素的数量,如果没有内在的限制,则返回Integer.MAX_VALUE。 remove(Object o) | 从这个队列中删除指定元素的一个实例,如果它存在的话。 take() | 检索并删除这个队列的头部,如果有必要,可以等待,直到有一个元素可用。 ### java.util.Collection接口中声明的方法 方法 | 描述 |
如果这个集合包含了指定集合中的所有元素,则返回true。 |
equals(Object o) | 将指定的对象与这个集合进行比较,看是否相等。 |
hashCode() | 返回此集合的哈希代码值。 |
isEmpty() | 如果这个集合不包含任何元素,返回true。 |
iterator() | 返回这个集合中的元素的迭代器。 |
parallelStream() | 返回一个以该集合为源的可能的并行流。 |
removeAll(Collection> c) | 删除这个集合中所有也包含在指定集合中的元素(可选操作)。 removeIf(Predicate super E> filter) | 删除此集合中满足给定谓词的所有元素。 retainAll(Collection> c) |
只保留此集合中包含在指定集合中的元素(可选操作)。 |
size() | 返回这个集合中元素的数量。 |
spliterator() | 在这个集合中的元素上创建一个Spliterator。 |
stream() | 返回一个以这个集合为源的顺序流。 |
toArray() | 返回一个包含此集合中所有元素的数组。 |
toArray(IntFunction<T[]> generator) | 返回一个包含此集合中所有元素的数组,使用提供的生成器函数来分配返回的数组。 |
toArray(T[] a) | 返回一个包含此集合中所有元素的数组;返回的数组的运行时类型是指定数组的类型。 |
java.lang.Iterable接口中声明的方法
方法 | 描述 |
---|---|
forEach(Consumer<? super T> action) | 对Iterable中的每个元素执行给定的动作,直到所有元素都被处理完或者该动作抛出一个异常。 |
接口java.util.Queue中声明的方法
方法 | 描述 |
---|---|
element() | 检索,但不删除这个队列的头部。 |
peek() | 检索但不删除这个队列的头部,如果这个队列是空的,则返回null。 |
poll() | 检索并删除这个队列的头部,如果这个队列是空的,则返回null。 |
remove() | 检索并删除该队列的头部。 |
BlockingQueue方法的行为
下面是 BlockingQueue 提供的用于插入、移除和检查队列操作的方法。如果请求的操作没有立即得到满足,这四组方法中的每一组都有不同的行为。
- 抛出异常: 如果请求的操作没有立即得到满足,就会抛出一个异常。
- 特殊值: 如果操作没有被立即满足,将返回一个特殊值。
- 阻塞: 如果尝试的操作没有立即得到满足,方法调用将被阻塞,并等待它被执行。
- Times out: 返回一个特殊的值,告诉你操作是否成功。如果请求的操作不能立即实现,方法调用就会阻塞,直到它被满足,但等待的时间不会超过给定的超时时间。
操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检验 | element() | peek() | 不适用 | 不适用 |
参考资料 :https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html