阻塞隊(duì)列通過添加兩組方法來擴(kuò)展隊(duì)列:
• 一組方法無限期地阻塞
• 另一組方法允許您指定要阻止的時(shí)間段。
BlockingQueue 接口的實(shí)例表示阻塞隊(duì)列。 BlockingQueue 接口繼承自 Queue 接口。
put()和 offer()方法在阻塞隊(duì)列的尾部添加一個(gè)元素。如果阻塞隊(duì)列已滿,則put()方法將無限期阻塞,直到隊(duì)列中的空間可用。offer()方法允許您指定等待空間可用的時(shí)間段。 如果指定的元素添加成功,則返回true; 否則為假。
take()和poll()方法檢索和刪除阻塞隊(duì)列的頭。如果阻塞隊(duì)列為空,take()方法將無限期阻塞。poll()方法允許您指定在阻塞隊(duì)列為空時(shí)要等待的時(shí)間段; 如果在元素可用之前過去了指定的時(shí)間,則返回null。
來自 BlockingQueue 中 Queue 接口的方法就像使用 Queue 。
BlockingQueue 被設(shè)計(jì)為線程安全的并且可以使用在生產(chǎn)者/消費(fèi)者的情況下。
阻塞隊(duì)列不允許空元素和可以是有界的或無界的。
BlockingQueue 中的 remainingCapacity()返回可以添加到阻止隊(duì)列中而不阻塞的元素?cái)?shù)。
BlockingQueue 可以控制多個(gè)線程被阻塞時(shí)的公平性。 如果阻塞隊(duì)列是公平的,它可以選擇最長的等待線程來執(zhí)行操作。如果阻塞隊(duì)列不公平,則不指定選擇的順序。
BlockingQueue 接口及其所有實(shí)現(xiàn)類都在 java.util.concurrent 包中。 以下是 BlockingQueue 接口的實(shí)現(xiàn)類:
由數(shù)組支持的 ArrayBlockingQueue 是 BlockingQueue 的有界實(shí)現(xiàn)類。 我們可以在其構(gòu)造函數(shù)中指定阻塞隊(duì)列的公平性。 默認(rèn)情況下,它不公平。
LinkedBlockingQueue 可以用作有界或無界阻塞隊(duì)列。 它不允許為阻塞隊(duì)列指定公平規(guī)則。
PriorityBlockingQueue 是 BlockingQueue 的無界實(shí)現(xiàn)類。 它的工作方式與 PriortyQueue 相同,用于排序阻塞隊(duì)列中的元素,并將阻塞特性添加到 PriorityQueue 中。
SynchronousQueue 實(shí)現(xiàn) BlockingQueue ,沒有任何容量。 put操作等待take操作以獲取元素。 它可以在兩個(gè)線程之間進(jìn)行握手,并在兩個(gè)線程之間交換對象。 它的isEmpty()方法總是返回true。
DelayQueue是BlockingQueue的無界實(shí)現(xiàn)類。它保持一個(gè)元素,直到該元素經(jīng)過指定的延遲。 如果有超過一個(gè)元素的延遲已經(jīng)過去,那么其延遲最早傳遞的元素將被放置在隊(duì)列的頭部。
例子
以下代碼顯示了如何在生產(chǎn)者/消費(fèi)者應(yīng)用程序中使用阻塞隊(duì)列。
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class BQProducer extends Thread {
private final BlockingQueue queue;
private final String name;
public BQProducer(BlockingQueue queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
while (true) {
try {
this.queue.put(UUID.randomUUID().toString());
Thread.sleep(4000);
}
catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
class BQConsumer extends Thread {
private final BlockingQueue queue;
private final String name;
public BQConsumer(BlockingQueue queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
while (true) {
try {
String str = this.queue.take();
System.out.println(name + " took: " + str);
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
public class Main {
public static void main(String[] args) {
int capacity = 5;
boolean fair = true;
BlockingQueue queue = new ArrayBlockingQueue<>(capacity, fair);
new BQProducer(queue, "Producer1").start();
new BQProducer(queue, "Producer2").start();
new BQProducer(queue, "Producer3").start();
new BQConsumer(queue, "Consumer1").start();
new BQConsumer(queue, "Consumer2").start();
}
}
上面的代碼生成以下結(jié)果。