上個星期總結(jié)了一下synchronized相關(guān)的知識,這次將Queue相關(guān)的知識總結(jié)一下,和朋友們分享。
在Java多線程應(yīng)用中,隊列的使用率很高,多數(shù)生產(chǎn)消費(fèi)模型的首選數(shù)據(jù)結(jié)構(gòu)就是隊列。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue,非阻塞隊列的典型例子是ConcurrentLinkedQueue,在實(shí)際應(yīng)用中要根據(jù)實(shí)際需要選用阻塞隊列或者非阻塞隊列。
注:什么叫線程安全?這個首先要明確。線程安全的類 ,指的是類內(nèi)共享的全局變量的訪問必須保證是不受多線程形式影響的。如果由于多線程的訪問(比如修改、遍歷、查看)而使這些變量結(jié)構(gòu)被破壞或者針對這些變量操作的原子性被破壞,則這個類就不是線程安全的。
今天就聊聊這兩種Queue,本文分為以下兩個部分,用分割線分開:
BlockingQueue
ConcurrentLinkedQueue,非阻塞算法
首先來看看BlockingQueue:
Queue是什么就不需要多說了吧,一句話:隊列是先進(jìn)先出。相對的,棧是后進(jìn)先出。如果不熟悉的話先找本基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)的書看看吧。
BlockingQueue,顧名思義,“阻塞隊列”:可以提供阻塞功能的隊列。
首先,看看BlockingQueue提供的常用方法:
從上表可以很明顯看出每個方法的作用,這個不用多說。我想說的是:
add(e) remove() element() 方法不會阻塞線程。當(dāng)不滿足約束條件時,會拋出IllegalStateException 異常。例如:當(dāng)隊列被元素填滿后,再調(diào)用add(e),則會拋出異常。
offer(e) poll() peek() 方法即不會阻塞線程,也不會拋出異常。例如:當(dāng)隊列被元素填滿后,再調(diào)用offer(e),則不會插入元素,函數(shù)返回false。
要想要實(shí)現(xiàn)阻塞功能,需要調(diào)用put(e) take() 方法。當(dāng)不滿足約束條件時,會阻塞線程。
好,上點(diǎn)源碼你就更明白了。以ArrayBlockingQueue類為例:
對于第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實(shí)調(diào)用的是第二類的方法,為什么?因?yàn)榈诙惙椒ǚ祷豣oolean啊。
Java代碼
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException(“Queue full”);//隊列已滿,拋異常
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();//隊列為空,拋異常
}
對于第二類方法,很標(biāo)準(zhǔn)的ReentrantLock使用方式(不熟悉的朋友看一下我上一篇帖子吧),另外對于insert和extract的實(shí)現(xiàn)沒啥好說的。
注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個類是線程安全類。
Java代碼
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//隊列已滿,返回false
return false;
else {
insert(e);//insert方法中發(fā)出了notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)//隊列為空,返回false
return null;
E x = extract();//extract方法中發(fā)出了notFull.signal();
return x;
} finally {
lock.unlock();
}
}
對于第三類方法,這里面涉及到Condition類,簡要提一下,
await方法指:造成當(dāng)前線程在接到信號或被中斷之前一直處于等待狀態(tài)。
signal方法指:喚醒一個等待線程。
Java代碼
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)//如果隊列已滿,等待notFull這個條件,這時當(dāng)前線程被阻塞
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); //喚醒受notFull阻塞的當(dāng)前線程
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)//如果隊列為空,等待notEmpty這個條件,這時當(dāng)前線程被阻塞
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal();//喚醒受notEmpty阻塞的當(dāng)前線程
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
第四類方法就是指在有必要時等待指定時間,就不詳細(xì)說了。
再來看看BlockingQueue接口的具體實(shí)現(xiàn)類吧:
ArrayBlockingQueue,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小
LinkedBlockingQueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)的Comparator決定的順序
上面是用ArrayBlockingQueue舉得例子,下面看看LinkedBlockingQueue:
首先,既然是鏈表,就應(yīng)該有Node節(jié)點(diǎn),它是一個內(nèi)部靜態(tài)類:
Java代碼
static class Node {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node next;
Node(E x) { item = x; }
}
然后,對于鏈表來說,肯定需要兩個變量來標(biāo)示頭和尾:
Java代碼
/** 頭指針 */
private transient Node head; //head.next是隊列的頭元素
/** 尾指針 */
private transient Node last; //last.next是null
那么,對于入隊和出隊就很自然能理解了:
Java代碼
private void enqueue(E x) {
last = last.next = new Node(x); //入隊是為last再找個下家
}
private E dequeue() {
Node first = head.next; //出隊是把head.next取出來,然后將head向后移一位
head = first;
E x = first.item;
first.item = null;
return x;
}
另外,LinkedBlockingQueue相對于ArrayBlockingQueue還有不同是,有兩個ReentrantLock,且隊列現(xiàn)有元素的大小由一個AtomicInteger對象標(biāo)示。
注:AtomicInteger類是以原子的方式操作整型變量。
Java代碼
private final AtomicInteger count = new AtomicInteger(0);
/** 用于讀取的獨(dú)占鎖*/
private final ReentrantLock takeLock = new ReentrantLock();
/** 隊列是否為空的條件 */
private final Condition notEmpty = takeLock.newCondition();
/** 用于寫入的獨(dú)占鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 隊列是否已滿的條件 */
private final Condition notFull = putLock.newCondition();
有兩個Condition很好理解,在ArrayBlockingQueue也是這樣做的。但是為什么需要兩個ReentrantLock呢?下面會慢慢道來。
讓我們來看看offer和poll方法的代碼:
Java代碼
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;//入隊當(dāng)然用putLock
putLock.lock();
try {
if (count.get() 《 capacity) {
enqueue(e); //入隊
c = count.getAndIncrement(); //隊長度+1
if (c + 1 《 capacity)
notFull.signal(); //隊列沒滿,當(dāng)然可以解鎖了
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//這個方法里發(fā)出了notEmpty.signal();
return c 》= 0;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;出隊當(dāng)然用takeLock
takeLock.lock();
try {
if (count.get() 》 0) {
x = dequeue();//出隊
c = count.getAndDecrement();//隊長度-1
if (c 》 1)
notEmpty.signal();//隊列沒空,解鎖
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//這個方法里發(fā)出了notFull.signal();
return x;
}
看看源代碼發(fā)現(xiàn)和上面ArrayBlockingQueue的很類似,關(guān)鍵的問題在于:為什么要用兩個ReentrantLockputLock和takeLock?
我們仔細(xì)想一下,入隊操作其實(shí)操作的只有隊尾引用last,并且沒有牽涉到head。而出隊操作其實(shí)只針對head,和last沒有關(guān)系。那么就是說入隊和出隊的操作完全不需要公用一把鎖,所以就設(shè)計了兩個鎖,這樣就實(shí)現(xiàn)了多個不同任務(wù)的線程入隊的同時可以進(jìn)行出隊的操作,另一方面由于兩個操作所共同使用的count是AtomicInteger類型的,所以完全不用考慮計數(shù)器遞增遞減的問題。
另外,還有一點(diǎn)需要說明一下:await()和singal()這兩個方法執(zhí)行時都會檢查當(dāng)前線程是否是獨(dú)占鎖的當(dāng)前線程,如果不是則拋出java.lang.IllegalMonitorStateException異常。所以可以看到在源碼中這兩個方法都出現(xiàn)在Lock的保護(hù)塊中。
-------------------------------我是分割線--------------------------------------
下面再來說說ConcurrentLinkedQueue,它是一個無鎖的并發(fā)線程安全的隊列。
以下部分的內(nèi)容參照了這個帖子
對比鎖機(jī)制的實(shí)現(xiàn),使用無鎖機(jī)制的難點(diǎn)在于要充分考慮線程間的協(xié)調(diào)。簡單的說就是多個線程對內(nèi)部數(shù)據(jù)結(jié)構(gòu)進(jìn)行訪問時,如果其中一個線程執(zhí)行的中途因?yàn)橐恍┰虺霈F(xiàn)故障,其他的線程能夠檢測并幫助完成剩下的操作。這就需要把對數(shù)據(jù)結(jié)構(gòu)的操作過程精細(xì)的劃分成多個狀態(tài)或階段,考慮每個階段或狀態(tài)多線程訪問會出現(xiàn)的情況。
ConcurrentLinkedQueue有兩個volatile的線程共享變量:head,tail。要保證這個隊列的線程安全就是保證對這兩個Node的引用的訪問(更新,查看)的原子性和可見性,由于volatile本身能夠保證可見性,所以就是對其修改的原子性要被保證。
下面通過offer方法的實(shí)現(xiàn)來看看在無鎖情況下如何保證原子性:
Java代碼
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node n = new Node(e, null);
for (;;) {
Node t = tail;
Node s = t.getNext();
if (t == tail) { //------------------------------a
if (s == null) { //---------------------------b
if (t.casNext(s, n)) { //-------------------c
casTail(t, n); //------------------------d
return true;
}
} else {
casTail(t, s); //----------------------------e
}
}
}
}
此方法的循環(huán)內(nèi)首先獲得尾指針和其next指向的對象,由于tail和Node的next均是volatile的,所以保證了獲得的分別都是最新的值。
代碼a:t==tail是最上層的協(xié)調(diào),如果其他線程改變了tail的引用,則說明現(xiàn)在獲得不是最新的尾指針需要重新循環(huán)獲得最新的值。
代碼b:s==null的判斷。靜止?fàn)顟B(tài)下tail的next一定是指向null的,但是多線程下的另一個狀態(tài)就是中間態(tài):tail的指向沒有改變,但是其next已經(jīng)指向新的結(jié)點(diǎn),即完成tail引用改變前的狀態(tài),這時候s!=null。這里就是協(xié)調(diào)的典型應(yīng)用,直接進(jìn)入代碼e去協(xié)調(diào)參與中間態(tài)的線程去完成最后的更新,然后重新循環(huán)獲得新的tail開始自己的新一次的入隊嘗試。另外值得注意的是a,b之間,其他的線程可能會改變tail的指向,使得協(xié)調(diào)的操作失敗。從這個步驟可以看到無鎖實(shí)現(xiàn)的復(fù)雜性。
代碼c:t.casNext(s, n)是入隊的第一步,因?yàn)槿腙犘枰獌刹剑焊翹ode的next,改變tail的指向。代碼c之前可能發(fā)生tail引用指向的改變或者進(jìn)入更新的中間態(tài),這兩種情況均會使得t指向的元素的next屬性被原子的改變,不再指向null。這時代碼c操作失敗,重新進(jìn)入循環(huán)。
代碼d:這是完成更新的最后一步了,就是更新tail的指向,最有意思的協(xié)調(diào)在這兒又有了體現(xiàn)。從代碼看casTail(t, n)不管是否成功都會接著返回true標(biāo)志著更新的成功。首先如果成功則表明本線程完成了兩步的更新,返回true是理所當(dāng)然的;如果 casTail(t, n)不成功呢?要清楚的是完成代碼c則代表著更新進(jìn)入了中間態(tài),代碼d不成功則是tail的指向被其他線程改變。意味著對于其他的線程而言:它們得到的是中間態(tài)的更新,s!=null,進(jìn)入代碼e幫助本線程執(zhí)行最后一步并且先于本線程成功。這樣本線程雖然代碼d失敗了,但是是由于別的線程的協(xié)助先完成了,所以返回true也就理所當(dāng)然了。
通過分析這個入隊的操作,可以清晰的看到無鎖實(shí)現(xiàn)的每個步驟和狀態(tài)下多線程之間的協(xié)調(diào)和工作。
注:上面這大段文字看起來很累,先能看懂多少看懂多少,現(xiàn)在看不懂先不急,下面還會提到這個算法,并且用示意圖說明,就易懂很多了。
在使用ConcurrentLinkedQueue時要注意,如果直接使用它提供的函數(shù),比如add或者poll方法,這樣我們自己不需要做任何同步。
但如果是非原子操作,比如:
Java代碼
if(!queue.isEmpty()) {
queue.poll(obj);
}
我們很難保證,在調(diào)用了isEmpty()之后,poll()之前,這個queue沒有被其他線程修改。所以對于這種情況,我們還是需要自己同步:
Java代碼
synchronized(queue) {
if(!queue.isEmpty()) {
queue.poll(obj);
}
}
注:這種需要進(jìn)行自己同步的情況要視情況而定,不是任何情況下都需要這樣做。
另外還說一下,ConcurrentLinkedQueue的size()是要遍歷一遍集合的,所以盡量要避免用size而改用isEmpty(),以免性能過慢。
好,最后想說點(diǎn)什么呢,阻塞算法其實(shí)很好理解,簡單點(diǎn)理解就是加鎖,比如在BlockingQueue中看到的那樣,再往前推點(diǎn),那就是synchronized。相比而言,非阻塞算法的設(shè)計和實(shí)現(xiàn)都很困難,要通過低級的原子性來支持并發(fā)。下面就簡要的介紹一下非阻塞算法,以下部分的內(nèi)容參照了一篇很經(jīng)典的文章
注:我覺得可以這樣理解,阻塞對應(yīng)同步,非阻塞對應(yīng)并發(fā)。也可以說:同步是阻塞模式,異步是非阻塞模式
舉個例子來說明什么是非阻塞算法:非阻塞的計數(shù)器
首先,使用同步的線程安全的計數(shù)器代碼如下
Java代碼
public final class Counter {
private long value = 0;
public synchronized long getValue() {
return value;
}
public synchronized long increment() {
return ++value;
}
}
下面的代碼顯示了一種最簡單的非阻塞算法:使用 AtomicInteger的compareAndSet()(CAS方法)的計數(shù)器。compareAndSet()方法規(guī)定“將這個變量更新為新值,但是如果從我上次看到這個變量之后其他線程修改了它的值,那么更新就失敗”
Java代碼
public class NonblockingCounter {
private AtomicInteger value;//前面提到過,AtomicInteger類是以原子的方式操作整型變量。
public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
while (!value.compareAndSet(v, v + 1));
return v + 1;
}
}
非阻塞版本相對于基于鎖的版本有幾個性能優(yōu)勢。首先,它用硬件的原生形態(tài)代替 JVM 的鎖定代碼路徑,從而在更細(xì)的粒度層次上(獨(dú)立的內(nèi)存位置)進(jìn)行同步,失敗的線程也可以立即重試,而不會被掛起后重新調(diào)度。更細(xì)的粒度降低了爭用的機(jī)會,不用重新調(diào)度就能重試的能力也降低了爭用的成本。即使有少量失敗的 CAS 操作,這種方法仍然會比由于鎖爭用造成的重新調(diào)度快得多。
NonblockingCounter 這個示例可能簡單了些,但是它演示了所有非阻塞算法的一個基本特征——有些算法步驟的執(zhí)行是要冒險的,因?yàn)橹廊绻?CAS 不成功可能不得不重做。非阻塞算法通常叫作樂觀算法,因?yàn)樗鼈兝^續(xù)操作的假設(shè)是不會有干擾。如果發(fā)現(xiàn)干擾,就會回退并重試。在計數(shù)器的示例中,冒險的步驟是遞增——它檢索舊值并在舊值上加一,希望在計算更新期間值不會變化。如果它的希望落空,就會再次檢索值,并重做遞增計算。
再來一個例子,Michael-Scott 非阻塞隊列算法的插入操作,ConcurrentLinkedQueue 就是用這個算法實(shí)現(xiàn)的,現(xiàn)在來結(jié)合示意圖分析一下,很明朗:
Java代碼
public class LinkedQueue {
private static class Node {
final E item;
final AtomicReference》 next;
Node(E item, Node next) {
this.item = item;
this.next = new AtomicReference》(next);
}
}
private AtomicReference》 head
= new AtomicReference》(new Node(null, null));
private AtomicReference》 tail = head;
public boolean put(E item) {
Node newNode = new Node(item, null);
while (true) {
Node curTail = tail.get();
Node residue = curTail.next.get();
if (curTail == tail.get()) {
if (residue == null) /* A */ {
if (curTail.next.compareAndSet(null, newNode)) /* C */ {
tail.compareAndSet(curTail, newNode) /* D */ ;
return true;
}
} else {
tail.compareAndSet(curTail, residue) /* B */;
}
}
}
}
}
看看這代碼完全就是ConcurrentLinkedQueue 源碼啊。
插入一個元素涉及頭指針和尾指針兩個指針更新,這兩個更新都是通過 CAS 進(jìn)行的:從隊列當(dāng)前的最后節(jié)點(diǎn)(C)鏈接到新節(jié)點(diǎn),并把尾指針移動到新的最后一個節(jié)點(diǎn)(D)。如果第一步失敗,那么隊列的狀態(tài)不變,插入線程會繼續(xù)重試,直到成功。一旦操作成功,插入被當(dāng)成生效,其他線程就可以看到修改。還需要把尾指針移動到新節(jié)點(diǎn)的位置上,但是這項(xiàng)工作可以看成是 “清理工作”,因?yàn)槿魏翁幵谶@種情況下的線程都可以判斷出是否需要這種清理,也知道如何進(jìn)行清理。
隊列總是處于兩種狀態(tài)之一:正常狀態(tài)(或稱靜止?fàn)顟B(tài),圖 1 和 圖 3)或中間狀態(tài)(圖 2)。在插入操作之前和第二個 CAS(D)成功之后,隊列處在靜止?fàn)顟B(tài);在第一個 CAS(C)成功之后,隊列處在中間狀態(tài)。在靜止?fàn)顟B(tài)時,尾指針指向的鏈接節(jié)點(diǎn)的 next 字段總為 null,而在中間狀態(tài)時,這個字段為非 null。任何線程通過比較 tail.next 是否為 null,就可以判斷出隊列的狀態(tài),這是讓線程可以幫助其他線程 “完成” 操作的關(guān)鍵。
上圖顯示的是:有兩個元素,處在靜止?fàn)顟B(tài)的隊列
插入操作在插入新元素(A)之前,先檢查隊列是否處在中間狀態(tài)。如果是在中間狀態(tài),那么肯定有其他線程已經(jīng)處在元素插入的中途,在步驟(C)和(D)之間。不必等候其他線程完成,當(dāng)前線程就可以 “幫助” 它完成操作,把尾指針向前移動(B)。如果有必要,它還會繼續(xù)檢查尾指針并向前移動指針,直到隊列處于靜止?fàn)顟B(tài),這時它就可以開始自己的插入了。
第一個 CAS(C)可能因?yàn)閮蓚€線程競爭訪問隊列當(dāng)前的最后一個元素而失敗;在這種情況下,沒有發(fā)生修改,失去 CAS 的線程會重新裝入尾指針并再次嘗試。如果第二個 CAS(D)失敗,插入線程不需要重試 —— 因?yàn)槠渌€程已經(jīng)在步驟(B)中替它完成了這個操作!
上圖顯示的是:處在插入中間狀態(tài)的隊列,在新元素插入之后,尾指針更新之前
上圖顯示的是:在尾指針更新后,隊列重新處在靜止?fàn)顟B(tài)
評論