Java中的BlockingQueue接口

Java中的BlockingQueue接口

Java中的 BlockingQueue接口 是在Java 1.5中添加的,同时还添加了各种其他并发实用类,如ConcurrentHashMap、 计数信号量 、CopyOnWriteArrayList等。BlockingQueue接口通过在BlockingQueue满或为空时引入阻塞来支持流量控制(除队列外)。试图将元素入队到满队列中的线程被阻塞,直到某些其他线程将队列中的元素出队或清空队列,为其腾出空间。同样地,它也会阻塞试图从空队列中删除的线程,直到某些其他线程插入一个项。BlockingQueue不接受null值。如果试图入队null item,则会抛出 NullPointerException
Java提供了几种BlockingQueue实现,例如LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、 SynchronousQueue 等。Java BlockingQueue接口实现是线程安全的。BlockingQueue的所有方法在本质上都是原子的,并且使用内部锁或其他形式的并发控制。Java 5在 java.util.concurrent包 中提供了BlockingQueue实现。

BlockingQueue的用法

Java中的BlockingQueue接口

使用生产者(put)线程和消费者(take)线程访问的BlockingQueue。

BlockingQueue的层次结构

Java中的BlockingQueue接口

声明

public interface BlockingQueue<E> extends Queue<E>

这里, E 是存储在集合中的元素类型。

实现BlockingQueue的类

由于BlockingQueue是一个接口,我们不能直接提供一个BlockingQueue实例,因此要利用实现它的类的功能。此外,要在您的代码中使用BlockingQueue,请使用以下导入语句。

import java.util.concurrent.BlockingQueue;
                    (或)
import java.util.concurrent.*;
  • LinkedBlockingQueue
  • ArrayBlockingQueue

BlockingDeque的实现类是LinkedBlockingDeque。该类是BlockingDeque和链表数据结构的实现。使用构造函数可以选择性地限制LinkedBlockingDeque的容量,但是如果未指定容量,则默认为Integer.MAX_VALUE。在插入时会动态添加节点以遵守容量约束。

创建对象的语法

BlockingQueue<?> objectName = new LinkedBlockingDeque<?>();   
                         (或)
LinkedBlockingDeque<?> objectName = new LinkedBlockingDeque<?>();

例如:在下面的代码中,我们对LinkedBlockingDeque执行一些基本操作,例如创建对象、添加元素、删除元素,并使用迭代器遍历LinkedBlockingDeque。

BlockingQueue类型

BlockingQueue有 两种类型

1. 无界队列: Blocking队列的Capacity将设置为Integer.MAX_VALUE。在无界阻塞队列的情况下,队列永远不会阻塞,因为它可以增长到非常大的大小。当添加元素时,其大小会增长。

语法:

BlockingQueue blockingQueue = new LinkedBlockingDeque();

2. 有界队列: 第二种队列是有界队列。在有界队列的情况下,您可以通过传递队列容量来创建队列和队列构造函数:

语法:

// 创建一个容量为5的阻塞队列
BlockingQueue blockingQueue = new LinkedBlockingDeque(5);

使用 BlockingQueue 实现有界 Semaphore

//  Java 程序,解释了 BlockingQueue 的内部实现
 
import java.io.*;
import java.util.*;
 
class BlockingQueue<E> {
 
    // 使用 LinkedList 结构的 BlockingQueue
    // 具有容量约束
    private List<E> queue = new LinkedList<E>();
 
    // 限制变量定义容量
    private int limit = 10;
 
    // BlockingQueue 的构造函数
    public BlockingQueue(int limit) {this.limit = limit; }
 
    // 入队方法,当您尝试插入到限制之后时,会抛出异常
    public synchronized void enqueue(E item)
        throws InterruptedException
    {
        while (this.queue.size() == this.limit) {
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }
        this.queue.add(item);
    }
 
    // 出队方法,从空队列中删除元素时会抛出异常
    public synchronized E dequeue()
        throws InterruptedException
    {
        while (this.queue.size() == 0) {
            wait();
        }
        if (this.queue.size() == this.limit) {
            notifyAll();
        }
 
        return this.queue.remove(0);
    }
   
    public static void main(String []args)
    {
    }
}

示例:

// Java 程序演示了如何使用 BlockingQueue
//
import java.util.concurrent.*;
import java.util.*;
 
public class GFG {
 
    public static void main(String[] args)
        throws InterruptedException
    {
 
        // 定义 ArrayBlockingQueue 的容量
        int capacity = 5;
 
        // 创建 ArrayBlockingQueue 的对象
        BlockingQueue<String> queue
            = new ArrayBlockingQueue<String>(capacity);
 
        // 使用 put 方法将元素添加到 ArrayBlockingQueue 中
        queue.put("StarWars");
        queue.put("SuperMan");
        queue.put("Flash");
        queue.put("BatMan");
        queue.put("Avengers");
 
        // 打印队列
        System.out.println("queue 包含 " + queue);
 
        // 删除一些元素
        queue.remove();
        queue.remove();
 
        // 使用 put 方法将元素添加到 ArrayBlockingQueue 中
        queue.put("CaptainAmerica");
        queue.put("Thor");
 
        System.out.println("queue 包含 " + queue);
    }
}

输出:

queue 包含 [StarWars, SuperMan, Flash, BatMan, Avengers]
queue 包含 [Flash, BatMan, Avengers, CaptainAmerica, Thor]

基本操作

1. 添加元素

可以通过不同的方式将元素添加到 LinkedBlockedDeque 中,具体取决于我们要将其用作的结构类型。最常见的方法是使用 add() 方法,可以将元素添加到双端队列的末尾。我们也可以使用 addAll() 方法(它是 Collection 接口的一个方法)将整个集合添加到 LinkedBlockingDeque 中。如果我们希望将 deque 用作队列,则可以使用 add() 和 put()。

//Java程序示范BlockingQueue的add()方法
 
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.*;
 
public class GFG {
   
    public static void main(String[] args)
        throws IllegalStateException
    {
 
        //创建BlockingQueue的对象
        BlockingQueue<Integer> BQ
            = new LinkedBlockingDeque<Integer>();
 
        //将数字添加到BlockingQueue中
        BQ.add(7855642);
        BQ.add(35658786);
        BQ.add(5278367);
        BQ.add(74381793);
 
        //在移除数据之前打印BlockingQueue中的数据
        System.out.println("Blocking Queue: " + BQ);
    }
}

输出结果

Blocking Queue: [7855642, 35658786, 5278367, 74381793]

2. 访问元素

可以使用contains()、element()、peek()和poll()方法访问LinkedBlockingDeque的元素。表格中给出了这些方法的变体以及它们的描述。

//Java程序:访问LinkedBlockingDeque的元素
 
import java.util.concurrent.*;
 
public class AccessingElements {
 
    public static void main(String[] args)
    {
 
        //创建LinkedBlockingDeque的对象
        BlockingQueue<Integer> lbdq
            = new LinkedBlockingDeque<Integer>();
 
        //使用add()添加元素
        lbdq.add(22);
        lbdq.add(125);
        lbdq.add(723);
        lbdq.add(172);
        lbdq.add(100);
 
        //在控制台上打印lbdq的元素
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        //检查deque是否包含22
        if (lbdq.contains(22))
            System.out.println(
                "The LinkedBlockingDeque, lbdq contains 22");
        else
            System.out.println("No such element exists");
 
        //使用element()获取deque的头部
        int head = lbdq.element();
        System.out.println("The head of lbdq: " + head);
    }
}

输出结果

The LinkedBlockingDeque, lbdq contains:
[22, 125, 723, 172, 100]
The LinkedBlockingDeque, lbdq contains 22
The head of lbdq: 22

3. 删除元素

可以使用remove()方法从LinkedBlockingDeque中删除元素。还可以使用take()和poll()等方法以删除第一个和最后一个元素的方式。

// 从LinkedBlockingDeque中移除元素的Java程序
 
import java.util.concurrent.*;
 
public class RemovingElements {
 
    public static void main(String[] args)
    {
 
        // 实例化一个名为lbdq的LinkedBlockingDeque对象
        BlockingQueue<Integer> lbdq
            = new LinkedBlockingDeque<Integer>();
 
        // 使用add()添加元素
        lbdq.add(75);
        lbdq.add(86);
        lbdq.add(13);
        lbdq.add(44);
        lbdq.add(10);
 
        // 在控制台上打印lbdq的元素
        System.out.println(
            "The LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
 
        // 使用remove()删除元素
        lbdq.remove(86);
        lbdq.remove(44);
 
        // 尝试删除LinkedBlockingDeque中不存在的元素
        lbdq.remove(1);
 
        // 在控制台上打印lbdq的元素
        System.out.println(
            "\nThe LinkedBlockingDeque, lbdq contains:");
        System.out.println(lbdq);
    }
}

输出

The LinkedBlockingDeque, lbdq contains:
[75, 86, 13, 44, 10]

The LinkedBlockingDeque, lbdq contains:
[75, 13, 10]

4. 遍历元素

要遍历LinkedBlockingDeque的元素,我们可以创建一个迭代器并使用Java集合框架的根接口Iterable的方法来访问元素。Iterable的next()方法返回任何集合的元素。

// Java程序用于遍历LinkedBlockingDeque
import java.util.Iterator;
import java.util.concurrent.*;
 
public class IteratingThroughElements {
 
    public static void main(String[] args) {
         
        // 实例化一个名为lbdq的LinkedBlockingDeque对象
        BlockingQueue<Integer> lbdq = new LinkedBlockingDeque<Integer>();
         
        // 使用add()添加元素
        lbdq.add(166);
        lbdq.add(246);
        lbdq.add(66);
        lbdq.add(292);
        lbdq.add(98);
         
        // 创建一个迭代器来遍历lbdq
        Iterator<Integer> lbdqIter = lbdq.iterator();
         
        // 在控制台上打印lbdq的元素
        System.out.println("The LinkedBlockingDeque, lbdq contains:");
         
        for(int i = 0; i<lbdq.size(); i++)
        {
            System.out.print(lbdqIter.next() + " ");
        }       
    }
 
}

输出

The LinkedBlockingDeque, lbdq contains:
166 246 66 292 98 

BlockingQueue的方法

方法 描述
add​(E e) 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,并在成功时返回 true,在当前没有可用空间时抛出 IllegalStateException。
contains​(Object o) 如果此队列包含指定的元素,则返回 true。
drainTo​(Collection c) | 从此队列中删除所有可用元素并将它们添加到给定的集合中。
drainTo​(Collection c, int maxElements) | 从此队列中最多取出给定数量的可用元素,并将它们添加到给定的集合中。
offer​(E e) | 如果可以立即执行而不违反容量限制,则将指定的元素插入此队列,并在成功时返回 true,在当前没有可用空间时返回 false。
offer​(E e, long timeout, TimeUnit unit) | 如果有必要,插入指定的元素到此队列,并等待指定的等待时间以使空间变得可用。
poll​(long timeout, TimeUnit unit) | 检索并移除此队列的头部,在必要时等待指定的等待时间以使一个元素变得可用。
put​(E e) | 如果有必要,将指定的元素插入此队列,并等待直到有可用空间。
remainingCapacity() | 返回此队列可以理想地(在没有内存或资源限制的情况下)接受的附加元素数量,而不会阻止或 Integer.MAX_VALUE(如果没有固有限制)。
remove​(Object o) | 如果存在,则从此队列中删除指定元素的单个实例。
take() | 检索并移除此队列的头部,在必要时等待直到一个元素变得可用。

### 在java.util.Collection接口中声明的其他方法

方法 | 描述
—|—
addAll​(Collection c) | 将指定集合中的所有元素添加到此集合中(可选操作)。
clear() | 从此集合中移除所有元素(可选操作)。
containsAll​(Collection c)

如果此集合包含指定集合中的所有元素,则返回true。
equals​(Object o) 将指定对象与此集合进行比较,以检查它们是否相等。
hashCode() 返回此集合的哈希码值。
isEmpty() 如果此集合不包含任何元素,则返回true。
iterator() 返回此集合中的元素上的迭代器。
parallelStream() 返回可能并行流,其源为此集合。
removeAll​(Collection c) | 移除此集合中也包含在指定集合中的所有元素(可选操作)。
removeIf​(Predicate filter) | 删除此集合中满足给定谓词的所有元素。
retainAll​(Collection c)
仅保留此集合中包含在指定集合中的元素(可选操作)。
size() 返回此集合中的元素数量。
spliterator() 创建Spliterator,用于此集合中的元素。
stream() 返回带有此集合作为源的顺序流。
toArray() 返回包含此集合中所有元素的数组。
toArray​(IntFunction<T[]> generator) 返回包含此集合中所有元素的数组,使用提供的生成器函数来分配返回的数组。
toArray​(T[] a) 返回包含此集合中所有元素的数组;返回数组的运行时类型与指定数组的类型相同。

在 java.lang.Iterable 中声明的方法

方法 描述
forEach​(Consumer<? super T> action) 对Iterable的每个元素执行给定操作,直到处理完所有元素或操作抛出异常。

在 java.util.Queue 中声明的方法

方法 描述
element() 检索但不删除此队列的头部。
peek() 检索但不删除此队列的头部,如果此队列为空,则返回null。
poll() 检索并删除此队列的头部,如果此队列为空,则返回null。
remove() 检索并删除此队列的头部。

BlockingQueue 方法的行为

下面是队列插入、删除和检查操作提供的 BlockingQueue 方法。如果请求的操作不能立即满足,则每组方法的行为都不同。

  • 抛出异常: 如果请求的操作不能立即满足,则会抛出异常。
  • 特殊值: 如果操作不能立即满足,则会返回特殊值。
  • 阻塞: 如果尝试的操作不能立即满足,则该方法调用会被阻塞,并等待直到它被执行。
  • 超时: 返回一个特殊值,告知操作是否成功。如果请求的操作不能立即执行,则该方法调用会被阻塞,但不会等待超过给定的超时时间。
操作 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 element() peek() 不适用 不适用

参考: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/BlockingQueue.html

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程