靠比較好的軟件下載網(wǎng)站社交媒體營(yíng)銷(xiāo)三種方式
Java JUC(三) AQS與同步工具詳解
一. ReentrantLock 概述
ReentrantLock
是 java.util.concurrent.locks
包下的一個(gè)同步工具類,它實(shí)現(xiàn)了 Lock
接口,提供了一種相比synchronized
關(guān)鍵字更靈活的鎖機(jī)制。ReentrantLock
是一種獨(dú)占式且可重入的鎖,并且支持可中斷、公平鎖/非公平鎖、超時(shí)等待、條件變量等高級(jí)特性。其特點(diǎn)如下:
- 獨(dú)占式: 一把鎖在同一時(shí)間只能被一個(gè)線程所獲取;
- 可重入: 可重入意味著同一個(gè)線程如果已持有某個(gè)鎖,則可以繼續(xù)多次獲得該鎖(注意釋放同樣次之后才算完全釋放成功);
- 可中斷: 在線程獲取鎖的等待過(guò)程中可以中斷獲取,放棄等待而轉(zhuǎn)去執(zhí)行其他邏輯;
- 公平性:
ReentrantLock
支持公平鎖和非公平鎖(默認(rèn))兩種模式。其中,公平鎖會(huì)按照線程請(qǐng)求鎖的順序來(lái)分配鎖(降低性能),而非公平鎖允許線程搶占已等待的線程的鎖(可能存在饑餓現(xiàn)象); - 條件變量: 通過(guò)
Condition
接口的實(shí)現(xiàn),允許線程在某些條件下等待或喚醒,即可以實(shí)現(xiàn)選擇性通知;
Type | Method | Description |
---|---|---|
/ | ReentrantLock() | 無(wú)參構(gòu)造方法,默認(rèn)為非公平鎖 |
/ | ReentrantLock(boolean fair) | 帶參構(gòu)造方法,其中fair 表示鎖的公平性策略:- true : 公平鎖- false : 非公平鎖 |
void | lock() | 不可中斷式獲取鎖。若當(dāng)前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖過(guò)程不可被中斷 |
void | lockInterruptibly() throws InterruptedException | 可中斷式獲取鎖。若當(dāng)前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖等待過(guò)程可被中斷,拋出InterruptedException ,并清除當(dāng)前線程的中斷狀態(tài) |
boolean | tryLock() | 嘗試獲取鎖,該方法會(huì)立即返回。若獲鎖成功,則返回true ,否則將返回false ;**注意:**該方法會(huì)破壞公平鎖配置,即在公平鎖策略下,該方法也會(huì)立即嘗試獲取可用鎖 |
boolean | tryLock(long timeout,TimeUnit unit) throws InterruptedException | 在給定時(shí)間timeout 內(nèi)嘗試獲取鎖。若獲鎖成功,則返回true ,否則將阻塞等待直到timeout 過(guò)期,返回false ;注意: - 獲鎖等待過(guò)程可被中斷,拋出 InterruptedException ,并清除當(dāng)前線程的中斷狀態(tài)- 遵循公平鎖配置策略,即在公平鎖策略下,該方法會(huì)按順序等待獲取鎖 |
void | unlock() | 當(dāng)前線程嘗試釋放該鎖。若當(dāng)前線程未持有該鎖,則拋出IllegalMonitorStateException 異常 |
Condition | newCondition() | 返回一個(gè)與當(dāng)前Lock 實(shí)例綁定的條件變量集合對(duì)象(默認(rèn)返回AQS 內(nèi)部實(shí)現(xiàn)類ConditionObject ),用于實(shí)現(xiàn)線程的條件等待/喚醒(詳見(jiàn)后文) |
1. 鎖的基本使用
相比synchronized
關(guān)鍵字來(lái)說(shuō),ReentrantLock
屬于顯式鎖,其鎖機(jī)制都是針對(duì)Lock
實(shí)例對(duì)象本身進(jìn)行加鎖,并且在使用過(guò)程中需要手動(dòng)釋放,即鎖的獲取與釋放是成對(duì)出現(xiàn)的;除此之外,ReentrantLock
屬于JDK API層面實(shí)現(xiàn)的互斥鎖,其通過(guò)方法調(diào)用實(shí)現(xiàn)鎖功能,可以跨方法從而更加靈活。為了避免出現(xiàn)死鎖問(wèn)題,官方建議的開(kāi)發(fā)方式如下:
// new lock object
ReentrantLock lock = new ReentrantLock();
lock.lock(); // block until condition holds
try {// ... method body
} finally {lock.unlock(); // release
}
問(wèn)題背景: 假設(shè)當(dāng)前有一個(gè)賣(mài)票系統(tǒng),一共有100張票,有4個(gè)窗口同時(shí)售賣(mài),請(qǐng)模擬該賣(mài)票過(guò)程,注意保證出票的正確性。
public class Test_01 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();for (int i = 0; i < 4; i++){new Thread(new TicketSeller(lock), "Thread_" + i).start();}}
}
// 售票類實(shí)現(xiàn)
class TicketSeller implements Runnable{private static int ticketCount = 100;private ReentrantLock windowLock;public TicketSeller(ReentrantLock lock) {this.windowLock = lock;}@Overridepublic void run() {while (true) {try {Thread.sleep(10);}catch(InterruptedException e){e.printStackTrace();}windowLock.lock();try{if(ticketCount > 0){System.out.println(Thread.currentThread().getName() + ": sale and left " + --ticketCount);}else{System.out.println(Thread.currentThread().getName() + ": sold out...");break;}}finally {windowLock.unlock();}}}
}
2. 條件變量機(jī)制
在synchronized
關(guān)鍵字中,我們可以通過(guò)wait¬ify
實(shí)現(xiàn)線程的等待與喚醒,但在此場(chǎng)景下存在虛假喚醒問(wèn)題,根本原因就是其等待/喚醒機(jī)制只支持單條件(等待線程未作區(qū)分,只能全部喚醒)。相比之下,ReentrantLock
基于Condition
接口也實(shí)現(xiàn)了相同的機(jī)制,并提供了更細(xì)的粒度和更高級(jí)的功能;每個(gè)Condition
實(shí)例都對(duì)應(yīng)一個(gè)條件隊(duì)列,用于維護(hù)在該條件場(chǎng)景下等待通知的線程,并且ReentrantLock
支持多條件變量,即一個(gè)ReentrantLock
可以關(guān)聯(lián)多個(gè)Condition
實(shí)例。其常用方法如下:
Type | Method | Description |
---|---|---|
void | await() throws InterruptedException | 使當(dāng)前線程阻塞等待(進(jìn)入該條件隊(duì)列),并釋放與此條件變量所關(guān)聯(lián)的鎖。注意: 若在等待期間被中斷,則拋出InterruptedException ,并清除當(dāng)前線程中斷狀態(tài) |
boolean | await(long time,TimeUnit unit) throws InterruptedException | 使當(dāng)前線程阻塞等待,并釋放與此條件變量所關(guān)聯(lián)的鎖,直到被喚醒、被中斷或等待time 時(shí)間過(guò)期。其返回值表示:- true : 在等待時(shí)間之內(nèi),條件被喚醒;- false : 等待時(shí)間過(guò)期,條件未被喚醒;注意: 若在等待期間被中斷,則拋出 InterruptedException ,并清除當(dāng)前線程中斷狀態(tài) |
void | signal() | 喚醒一個(gè)等待在Condition 上的線程。如果有多個(gè)線程在此條件變量下等待,則選擇任意一個(gè)線程喚醒;注意: 從等待方法返回前必須重新獲得Condition 相關(guān)聯(lián)的鎖 |
void | signalAll() | 喚醒所有等待在Condition 上的線程。如果有多個(gè)線程在此條件變量下等待,則全部喚醒 ;注意: 從等待方法返回前必須重新獲得Condition 相關(guān)聯(lián)的鎖 |
在使用時(shí)需要注意:
- 調(diào)用
await
相關(guān)方法前需要先獲得對(duì)應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會(huì)拋出IllegalMonitorStateException
異常; - 調(diào)用
signal
相關(guān)方法前需要先獲得對(duì)應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會(huì)拋出IllegalMonitorStateException
異常; await
線程被喚醒(或等待時(shí)間過(guò)期、被中斷)后會(huì)重新參與鎖的競(jìng)爭(zhēng),若成功拿到鎖則將從await
處恢復(fù)繼續(xù)向下執(zhí)行;
/*** 場(chǎng)景模擬:奶茶店和咖啡店共用一個(gè)窗口(window)出餐,等待顧客點(diǎn)單...* - 奶茶店(teaWithMilk):顧客需要奶茶,則奶茶店開(kāi)始工作;* - 咖啡店(coffee):顧客需要咖啡,則咖啡店開(kāi)始工作;*/
public class Test {// 窗口鎖(ReentrantLock實(shí)現(xiàn))static final ReentrantLock window = new ReentrantLock();// 奶茶點(diǎn)單條件變量static Condition teaWithMilk = window.newCondition();// 咖啡點(diǎn)單條件變量static Condition coffee = window.newCondition();public static void main(String[] args) throws InterruptedException {// 奶茶店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[奶茶店] 等待接單...");try {teaWithMilk.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 接到訂單...");} finally {window.unlock();}System.out.println("[奶茶店] 開(kāi)始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 工作完成...");}}}).start();Thread.sleep(1000);// 咖啡店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[咖啡店] 等待接單...");try {coffee.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 接到訂單...");} finally {window.unlock();}System.out.println("[咖啡店] 開(kāi)始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 工作完成...");}}}).start();Thread.sleep(1000);// 顧客點(diǎn)單線程: mainwindow.lock();try{System.out.println("[顧客] 點(diǎn)了咖啡!!");coffee.signal(); // 喚醒咖啡條件等待線程}finally {window.unlock();}}
}
二. 從 ReentrantLock 分析 AQS 的原理
1. AQS 框架
AQS
全稱為 AbstractQueuedSynchronizer
,即抽象隊(duì)列同步器;AQS
是 java.util.concurrent.locks
包下的一個(gè)抽象類,其為構(gòu)建鎖和同步器提供了一系列通用模板與框架的實(shí)現(xiàn),大部分JUC
包下的并發(fā)工具都是基于AQS
來(lái)構(gòu)建的,比如ReentrantLock
、Semaphore
、CountDownLatch
等。其核心源碼如下:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {// 同步隊(duì)列的節(jié)點(diǎn)static final class Node {...}// 指向同步隊(duì)列頭部private transient volatile Node head;// 指向同步隊(duì)列尾部private transient volatile Node tail;// 同步狀態(tài)private volatile int state;// 提供一系列并發(fā)、同步隊(duì)列的基本操作方法// 比如: 掛起、取消、節(jié)點(diǎn)插入、節(jié)點(diǎn)替換等...// 交由子類實(shí)現(xiàn)的模板方法(鉤子方法): 自定義同步器的核心實(shí)現(xiàn)目標(biāo)protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}// 條件變量 Condition 接口的內(nèi)部實(shí)現(xiàn)類public class ConditionObject implements Condition {...}}
由上可知,AQS內(nèi)部實(shí)現(xiàn)了一個(gè)核心內(nèi)部類Node
,該內(nèi)部類表示對(duì)等待獲取鎖的線程的封裝節(jié)點(diǎn);在AQS中,基于Node
維護(hù)了一個(gè)雙向鏈表(模擬同步隊(duì)列),其中head
節(jié)點(diǎn)指向同步隊(duì)列的頭部,而tail
節(jié)點(diǎn)指向同步隊(duì)列的尾部。Node
類的核心源碼如下:
static final class Node {// 共享模式(共享鎖、比如Semaphore)static final Node SHARED = new Node();// 獨(dú)占模式(獨(dú)占鎖、比如ReentrantLock)static final Node EXCLUSIVE = null;// 標(biāo)識(shí)節(jié)點(diǎn)線程獲取鎖的請(qǐng)求已取消、已結(jié)束static final int CANCELLED = 1;// 標(biāo)識(shí)節(jié)點(diǎn)線程已準(zhǔn)備就緒,等待被喚醒獲取資源static final int SIGNAL = -1;// 標(biāo)識(shí)節(jié)點(diǎn)線程在條件變量Condition中等待static final int CONDITION = -2;// 在共享模式下啟用: 標(biāo)識(shí)獲得的同步狀態(tài)會(huì)被傳播static final int PROPAGATE = -3;/*** waitStatus 標(biāo)識(shí)節(jié)點(diǎn)線程在同步隊(duì)列中的狀態(tài),共存在以下幾種情況:* (1)SIGNAL: 被標(biāo)記為SIGNAL的節(jié)點(diǎn)處于等待喚醒獲取資源的狀態(tài),只要前驅(qū)節(jié)點(diǎn)釋放鎖就會(huì)通知該狀態(tài)的后續(xù)節(jié)點(diǎn)線程執(zhí)行* (2)CANCELLED: 在同步隊(duì)列中等待超時(shí)、被中斷的線程會(huì)進(jìn)入取消狀態(tài),不再響應(yīng)并會(huì)在遍歷過(guò)程中被移除* (3)CONDITION: 標(biāo)識(shí)當(dāng)前節(jié)點(diǎn)線程在Condition下等待,被喚醒后將重新從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列* (4)PROPAGATE: 與共享模式有關(guān) * (5)0: 默認(rèn)初始值狀態(tài),代表節(jié)點(diǎn)初始化*/volatile int waitStatus;// 同步隊(duì)列中的前驅(qū)節(jié)點(diǎn)volatile Node prev;// 同步隊(duì)列中的后繼節(jié)點(diǎn)volatile Node next;// 等待獲取鎖資源的線程volatile Thread thread;// Condition 等待隊(duì)列中的后繼節(jié)點(diǎn)(單向鏈表)Node nextWaiter;// 判斷是否為共享模式final boolean isShared() {return nextWaiter == SHARED;}// 獲取當(dāng)前節(jié)點(diǎn)在同步隊(duì)列中的前驅(qū)節(jié)點(diǎn)final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 省略構(gòu)造方法...}
在了解Node
的基本數(shù)據(jù)結(jié)構(gòu)與狀態(tài)之后,AQS
還有一個(gè)核心狀態(tài)變量即state
,該全局變量用于表示同步鎖的狀態(tài),其具體含義一般在子類實(shí)現(xiàn)中進(jìn)行定義和維護(hù)(獨(dú)占鎖和共享鎖不一樣)。但不管獨(dú)占模式還是共享模式,簡(jiǎn)單來(lái)說(shuō)都是使用一個(gè)volatile
的全局變量來(lái)表示資源同步狀態(tài)(state
),并通過(guò)CAS完成對(duì)state
值的修改(修改成功則表示獲鎖成功),當(dāng)持有鎖的線程數(shù)量超過(guò)當(dāng)前模式(獨(dú)占模式一般限制為1)時(shí),則通過(guò)內(nèi)置的FIFO同步隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作(通過(guò)LockSupport park/unpark
方法實(shí)現(xiàn)掛起與喚醒)。其核心原理圖如下:
綜上,AQS采用了模板方法模式來(lái)構(gòu)建同步框架,并提供了一系列并發(fā)操作的公共基礎(chǔ)方法,支持共享模式和獨(dú)占模式兩種實(shí)現(xiàn);但AQS并不負(fù)責(zé)對(duì)外提供具體的加鎖/解鎖邏輯,因?yàn)殒i是千變?nèi)f化的,AQS只關(guān)注基礎(chǔ)組件、頂層模板這些總的概念,具體的鎖邏輯將通過(guò)”鉤子“的方式下放給子類實(shí)現(xiàn)。也就是說(shuō),獨(dú)占模式只需要實(shí)現(xiàn)tryAcquire-tryRelease
方法、共享模式只需要實(shí)現(xiàn)tryAcquireShared-tryReleaseShared
方法,搭配AQS提供的框架和基礎(chǔ)組件就能輕松實(shí)現(xiàn)自定義的同步工具。
2. ReentrantLock 源碼分析
由ReentrantLock
的類結(jié)構(gòu)圖可以看出,ReentrantLock
實(shí)現(xiàn)了Lock
接口,其內(nèi)部包含一個(gè)內(nèi)部類Sync
,該內(nèi)部類繼承了AQS
(AbstractQueuedSynchronizer
),ReentrantLock
大部分的鎖操作都是通過(guò)Sync
實(shí)現(xiàn)的。除此之外,ReentrantLock
有公平鎖和非公平鎖兩種模式,分別對(duì)應(yīng)Sync
的FairSync
和NonfairSync
兩個(gè)子類實(shí)現(xiàn)。
public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;// 默認(rèn)構(gòu)造函數(shù): 默認(rèn)創(chuàng)建非公平鎖public ReentrantLock() {sync = new NonfairSync();}// 帶參構(gòu)造函數(shù): true公平鎖/false非公平鎖public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}// 加鎖操作public void lock() {sync.lock();}//...
}
接下來(lái),本節(jié)將首先以ReentrantLock
的非公平鎖為例進(jìn)行分析,然后再介紹公平鎖與非公平鎖的主要區(qū)別。
2.1 非公平鎖lock加鎖原理
非公平鎖NonfairSync
的源碼如下:
static final class NonfairSync extends Sync {// 加鎖操作final void lock() {// CAS修改state狀態(tài)以獲取鎖資源if (compareAndSetState(0, 1))// 成功則將獨(dú)占鎖線程設(shè)置為當(dāng)前線程setExclusiveOwnerThread(Thread.currentThread());else// 否則再次請(qǐng)求同步狀態(tài)(AQS的模板方法)acquire(1);}// AQS 獲鎖的鉤子方法實(shí)現(xiàn)protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
在調(diào)用lock
方法獲取鎖的過(guò)程中,當(dāng)前線程會(huì)先通過(guò)CAS操作嘗試修改state
從0
(表示無(wú)鎖)到1
(表示占有鎖),若修改成功則將AQS
中保存的獨(dú)占線程exclusiveOwnerThread
修改為當(dāng)前線程;若失敗則執(zhí)行acquire(1)
方法,該方法是AQS
中的一個(gè)模板方法,其源碼如下:
public final void acquire(int arg) {// tryAcquire -> addWaiter -> acquireQueuedif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
可以看到該方法首先調(diào)用了鉤子方法tryAcquire
,該方法是交由子類NonfairSync
實(shí)現(xiàn)的,在上面的源碼中我們已經(jīng)給出了tryAcquire
的實(shí)現(xiàn)代碼,其直接調(diào)用了父類Sync
中的nonfairTryAcquire
方法,其源碼如下:
abstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock();// nonfairTryAcquire 方法實(shí)現(xiàn)final boolean nonfairTryAcquire(int acquires) {// 獲取當(dāng)前線程及同步隊(duì)列狀態(tài)statefinal Thread current = Thread.currentThread();int c = getState();// 若狀態(tài)為0表示鎖已釋放: 重新嘗試獲取鎖if (c == 0) {// CAS嘗試修改state的值if (compareAndSetState(0, acquires)) {// 若成功則設(shè)置獨(dú)占線程為當(dāng)前線程setExclusiveOwnerThread(current);return true;}}// 若獨(dú)占線程即當(dāng)前線程,則屬于重入鎖else if (current == getExclusiveOwnerThread()) {// 修改state為重入值int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//省略代碼...
}
由上述代碼可知,該方法首先再次判斷鎖是否已釋放,這是為了避免之前持鎖的線程在這段時(shí)間內(nèi)又重新釋放了鎖,若state==0
則會(huì)嘗試再次CAS修改同步狀態(tài)以獲取鎖資源;否則的話,則判斷當(dāng)前線程是否是鎖重入的情況,若兩個(gè)判斷都不滿足則返回false
;到目前為止,我們回過(guò)頭來(lái)想一下非公平鎖的非公平性是在哪體現(xiàn)的?
很明顯,在上述代碼分析中,當(dāng)有任何線程嘗試獲取鎖時(shí)(調(diào)用lock
方法),不論當(dāng)前同步隊(duì)列中是否已有線程排隊(duì)等待,NonfairSync
的lock()
方法以及Sync
的nonfairTryAcquire()
方法都沒(méi)有對(duì)同步隊(duì)列中的等待情況進(jìn)行判斷,而是直接通過(guò)CAS嘗試修改state
的值來(lái)為當(dāng)前線程直接占有鎖;這就是非公平性的體現(xiàn),搶占線程可以直接與等待線程競(jìng)爭(zhēng)鎖資源,而不用按照順序加入隊(duì)列。
分析完這部分之后,我們?cè)倩氐?code>acquire(1)方法,若tryAcquire(arg)
方法返回false
即獲取不到鎖時(shí)會(huì)繼續(xù)向下執(zhí)行到addWaiter(Node.EXCLUSIVE)
方法,該方法用于封裝線程入隊(duì),其源碼如下:
private Node addWaiter(Node mode) {// 將請(qǐng)求占鎖失敗的線程封裝為Node節(jié)點(diǎn)Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 若同步隊(duì)列不為空,則嘗試CAS在尾部插入當(dāng)前節(jié)點(diǎn)(FIFO)if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若隊(duì)列為空或CAS插入節(jié)點(diǎn)失敗則執(zhí)行enq()方法處理入隊(duì)enq(node);return node;}
接著,我們繼續(xù)分析enq(node)
方法的實(shí)現(xiàn):
private Node enq(final Node node) {// 開(kāi)啟自旋(循環(huán))for (;;) {Node t = tail;// 若隊(duì)列為空if (t == null) { // Must initialize// 則嘗試CAS創(chuàng)建頭節(jié)點(diǎn)(不存儲(chǔ)數(shù)據(jù))// 原因: 隊(duì)列為空可能是因?yàn)槠渌€程非公平占有了鎖(當(dāng)前線程試過(guò)沒(méi)搶到),因此這里需要先斬后奏,即再次創(chuàng)建頭節(jié)點(diǎn)表示已占鎖線程的占位,來(lái)維護(hù)同步隊(duì)列if (compareAndSetHead(new Node()))tail = head;} else {// 否則嘗試CAS添加尾節(jié)點(diǎn)node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}// 若CAS失敗(可能有多線程并發(fā)操作),則不斷自旋重試直到插入成功}
可以看到頭節(jié)點(diǎn)head
其實(shí)是不存儲(chǔ)數(shù)據(jù)的,它只表示一個(gè)線程占位(占位了鎖資源),因?yàn)槲挥陬^節(jié)點(diǎn)的線程肯定已經(jīng)獲取到了鎖,頭節(jié)點(diǎn)只存儲(chǔ)后繼節(jié)點(diǎn)指向,用于當(dāng)前線程釋放鎖資源時(shí)喚醒后繼節(jié)點(diǎn);那么到此這個(gè)方法也就分析完成了,在節(jié)點(diǎn)入隊(duì)成功之后會(huì)返回當(dāng)前節(jié)點(diǎn)node
,然后會(huì)繼續(xù)執(zhí)行到acquireQueued(addWaiter(Node.EXCLUSIVE),arg)
方法,其源碼如下:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false; //阻塞掛起標(biāo)志// 開(kāi)啟自旋(循環(huán))for (;;) {// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)pfinal Node p = node.predecessor();// 若p是頭節(jié)點(diǎn)則當(dāng)前節(jié)點(diǎn)嘗試獲取鎖資源if (p == head && tryAcquire(arg)) {// 占鎖成功則設(shè)置當(dāng)前節(jié)點(diǎn)node為頭節(jié)點(diǎn): 頭節(jié)點(diǎn)狀態(tài)保持SIGNAL狀態(tài)setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 若p不是頭節(jié)點(diǎn)或獲取鎖資源失敗,則判斷是否阻塞掛起線程來(lái)等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 若最終無(wú)法獲取鎖,則取消該線程的請(qǐng)求if (failed)cancelAcquire(node);}}// 將傳遞的節(jié)點(diǎn)設(shè)置為同步隊(duì)列的頭節(jié)點(diǎn)private void setHead(Node node) {head = node;// 清空當(dāng)前節(jié)點(diǎn)存儲(chǔ)的線程信息node.thread = null;node.prev = null;}
在acquireQueued
方法中,線程會(huì)開(kāi)啟自旋,若發(fā)現(xiàn)(或被喚醒后發(fā)現(xiàn))當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)變?yōu)轭^節(jié)點(diǎn),則說(shuō)明當(dāng)前節(jié)點(diǎn)能夠嘗試獲取鎖資源,并嘗試通過(guò)tryAcquire
方法獲取同步狀態(tài);需要注意的是,head
頭節(jié)點(diǎn)表示當(dāng)前占有鎖的線程節(jié)點(diǎn),只有當(dāng)head
節(jié)點(diǎn)對(duì)應(yīng)的線程釋放鎖資源并喚醒后繼節(jié)點(diǎn)時(shí),后繼節(jié)點(diǎn)線程才會(huì)自旋去嘗試占有鎖資源,因此:在同步隊(duì)列中,只有前驅(qū)節(jié)點(diǎn)變?yōu)轭^節(jié)點(diǎn)時(shí),當(dāng)前節(jié)點(diǎn)才有資格嘗試獲取鎖資源,其他時(shí)候都將被掛起等待,避免空轉(zhuǎn)CPU
。
除此之外,若在自旋過(guò)程中,當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者節(jié)點(diǎn)嘗試tryAcquire
獲取鎖資源失敗,則會(huì)執(zhí)行shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt()
邏輯;需要注意的是在前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)但嘗試獲取鎖資源失敗這種特殊情況發(fā)生時(shí)(比如非公平鎖模式下被新到來(lái)的請(qǐng)求線程搶占),頭節(jié)點(diǎn)head
此時(shí)可能有兩種狀態(tài):
-
waitStatus==0
: 處于該狀態(tài)一種情況是初始同步隊(duì)列為空時(shí),默認(rèn)頭節(jié)點(diǎn)狀態(tài)初始化為0;另一種情況是鎖釋放時(shí)(見(jiàn)后文)被unparkSuccessor
重置頭節(jié)點(diǎn)狀態(tài); -
waitStatus==SIGNAL
: 處于該狀態(tài)一種情況是waitStatus==0
時(shí)更新?tīng)顟B(tài)后又自旋回來(lái)但仍未獲取到鎖(可能釋放鎖后被非公平搶占);另一種情況是釋放鎖時(shí)unparkSuccessor
重置失敗;
其源碼如下:
// 判斷節(jié)點(diǎn)線程是否掛起等待private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 獲取前驅(qū)節(jié)點(diǎn)的等待狀態(tài)int ws = pred.waitStatus;// 若是SIGNAL狀態(tài),則說(shuō)明前驅(qū)節(jié)點(diǎn)就緒,當(dāng)前節(jié)點(diǎn)正常需要繼續(xù)等待即返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 若等待狀態(tài)>0則說(shuō)明前驅(qū)節(jié)點(diǎn)是結(jié)束狀態(tài),需要遍歷前驅(qū)節(jié)點(diǎn)直到找到非結(jié)束狀態(tài)的有效節(jié)點(diǎn)if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 若等待狀態(tài)<=0且非SIGNAL,則嘗試將前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// false 則返回去繼續(xù)自旋return false;}// 執(zhí)行掛起阻塞操作 LockSupport.parkprivate final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
shouldParkAfterFailedAcquire
方法的執(zhí)行邏輯是判斷前驅(qū)節(jié)點(diǎn)的等待狀態(tài)waitStatus
,用于掛起當(dāng)前節(jié)點(diǎn)線程等待,結(jié)合上述分析,包括以下幾種情況:
- 前驅(qū)節(jié)點(diǎn)狀態(tài)
waitStatus==SIGNAL
: 若前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),則說(shuō)明占鎖線程還未執(zhí)行結(jié)束,當(dāng)前節(jié)點(diǎn)線程仍需掛起等待;若前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),則說(shuō)明前驅(qū)節(jié)點(diǎn)就緒/擁有更高的優(yōu)先級(jí),下次執(zhí)行還輪不到當(dāng)前節(jié)點(diǎn),所以也可以安全掛起,直接返回true
; - 前驅(qū)節(jié)點(diǎn)狀態(tài)
waitStatus>0
: 說(shuō)明前驅(qū)節(jié)點(diǎn)已處于結(jié)束/取消狀態(tài),應(yīng)該從同步隊(duì)列中移除,并遍歷所有前驅(qū)節(jié)點(diǎn)直到找到非結(jié)束狀態(tài)的有效節(jié)點(diǎn)作為前驅(qū); - 前驅(qū)節(jié)點(diǎn)狀態(tài)
waitStatus<0
且非SIGNAL: 前驅(qū)節(jié)點(diǎn)剛從Condition
的條件等待隊(duì)列被喚醒,從而轉(zhuǎn)移到同步隊(duì)列,需要轉(zhuǎn)換為SIGNAL
狀態(tài)等待; - 前驅(qū)節(jié)點(diǎn)狀態(tài)
waitStatus==0
:若前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),則說(shuō)明同步隊(duì)列剛初始化(0)或鎖剛被釋放重置,鎖資源可能未被其他線程持有,需判斷能否占有鎖(不管當(dāng)前線程能否占有,該鎖一定會(huì)被占有,都需要轉(zhuǎn)換狀態(tài)為SIGNAL
);若前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),則說(shuō)明該線程節(jié)點(diǎn)剛初始化并被插入隊(duì)列,需要轉(zhuǎn)換為SIGNAL
狀態(tài);
綜上,當(dāng)shouldParkAfterFailedAcquire()
方法返回true
時(shí)會(huì)調(diào)用parkAndCheckInterrupt
方法掛起線程等待被喚醒,返回false
時(shí)則會(huì)繼續(xù)自旋判斷;至此,ReetrantLock
內(nèi)部間接依靠AQS
的FIFO
同步隊(duì)列,就完成了lock()
加鎖操作。
2.2 公平鎖lock加鎖原理
公平鎖FairSync
的源碼如下:
static final class FairSync extends Sync {// 加鎖操作final void lock() {acquire(1);}// AQS 獲鎖的鉤子方法實(shí)現(xiàn)protected final boolean tryAcquire(int acquires) {// 獲取當(dāng)前線程final Thread current = Thread.currentThread();// 獲取同步狀態(tài)int c = getState();// 若當(dāng)前沒(méi)有線程持有鎖資源if (c == 0) {// 首先判斷同步隊(duì)列是否存在等待節(jié)點(diǎn)if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 鎖重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
與非公平鎖唯一不同的是:公平鎖的tryAcquire
實(shí)現(xiàn)中,在嘗試修改state
之前,會(huì)先調(diào)用hasQueuedPredecessors()
判斷AQS
內(nèi)部同步隊(duì)列中是否已存在等待節(jié)點(diǎn)。如果存在,則說(shuō)明在此之前,已經(jīng)有線程提交了獲取鎖的請(qǐng)求,那么當(dāng)前線程就會(huì)直接被封裝成Node
節(jié)點(diǎn),追加到隊(duì)尾等待。
2.3 釋放鎖原理
ReetrantLock
顯式鎖需要手動(dòng)釋放鎖資源,其unlock()
方法直接調(diào)用了Sync
中的release(1)
方法,而該方法又是在其父類AQS
中直接實(shí)現(xiàn)的,其源碼如下:
public final boolean release(int arg) {// 嘗試釋放鎖if (tryRelease(arg)) {// 進(jìn)入該代碼塊則說(shuō)明鎖已完全釋放(state=0)// 獲取頭節(jié)點(diǎn)Node h = head;if (h != null && h.waitStatus != 0)// 喚醒head頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程unparkSuccessor(h);return true;}return false;}// 喚醒node的后繼節(jié)點(diǎn)線程private void unparkSuccessor(Node node) {// 獲取節(jié)點(diǎn)狀態(tài)int ws = node.waitStatus;if (ws < 0) // 重置節(jié)點(diǎn)狀態(tài),允許失敗compareAndSetWaitStatus(node, ws, 0);// 獲取后繼節(jié)點(diǎn)Node s = node.next;// 若后繼節(jié)點(diǎn)為空或已結(jié)束if (s == null || s.waitStatus > 0) {s = null;// 尋找后繼可被喚醒的有效等待節(jié)點(diǎn)for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 執(zhí)行線程喚醒(繼續(xù)去自旋) LockSupport.unparkif (s != null)LockSupport.unpark(s.thread);}
release
方法能否釋放鎖并喚醒后繼節(jié)點(diǎn)線程依賴于tryRelease
鉤子方法,而該方法又下放到了Sync
中實(shí)現(xiàn),其源碼如下:
// ReentrantLock -> Sync -> tryRelease(1)protected final boolean tryRelease(int releases) {// 計(jì)算釋放鎖后的同步更新?tīng)顟B(tài)int c = getState() - releases;// 如果當(dāng)前釋放鎖的線程不為持有鎖的線程則拋出異常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 判斷更新?tīng)顟B(tài)是否為0,如果是則說(shuō)明已完全釋放鎖if (c == 0) {free = true;// 完全釋放鎖才清空當(dāng)前占有線程setExclusiveOwnerThread(null);}// 更新state值setState(c);// 完全釋放鎖才返回truereturn free;}
注意:tryRelease
方法執(zhí)行完成返回true
之后,就說(shuō)明當(dāng)前線程持有的鎖已被釋放(非公平鎖中就已經(jīng)可以被搶占了),后續(xù)unparkSuccessor
方法只是進(jìn)行一些善后工作,其中重置頭節(jié)點(diǎn)狀態(tài)的目的是表示邏輯上從持鎖到無(wú)鎖的轉(zhuǎn)換,鎖資源目前可能并沒(méi)有線程持有,因此在后續(xù)線程喚醒后執(zhí)行acquireQueued
自旋時(shí)waitStatus==0
狀態(tài)會(huì)再一次判斷并嘗試獲取鎖,而修改為SIGNAL
就表示占鎖線程正在執(zhí)行,其他線程需要掛起等待。至此,整個(gè)流程可以結(jié)合起來(lái)理解:s
節(jié)點(diǎn)的線程被喚醒后,會(huì)繼續(xù)執(zhí)行acquireQueued()
方法中的自旋,判斷if (p == head && tryAcquire(arg))
代碼是否成立,從而執(zhí)行判斷操作。
三. 其他同步工具類
1. Semaphore
1.1 基本概述
Semaphore
是java.util.concurrent
包下的一種計(jì)數(shù)信號(hào)量,它同樣也是基于AQS
實(shí)現(xiàn)的同步工具類。相比ReentrantLock
來(lái)說(shuō),它應(yīng)該屬于共享鎖,即允許多個(gè)線程同時(shí)訪問(wèn)某個(gè)共享資源,但會(huì)限制同時(shí)訪問(wèn)特定資源的線程數(shù)量;Semaphore
同樣也支持公平模式和非公平模式兩種方式,其構(gòu)造方法如下:
public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore
默認(rèn)為非公平模式(搶占式),在構(gòu)造信號(hào)量對(duì)象時(shí)都必須提供permits
參數(shù);permits
可以理解為許可證數(shù)量,只有拿到許可證的線程才能執(zhí)行,該參數(shù)限制了能同時(shí)獲取或訪問(wèn)到共享資源的線程數(shù)量,其他超出線程都將阻塞等待。基于此,Semaphore
通常用于實(shí)現(xiàn)資源有明確訪問(wèn)數(shù)量限制的場(chǎng)景,比如限流、池化等。Semaphore
的常用方法介紹如下:
Type | Method | Description |
---|---|---|
void | acquire() throws InterruptedException | 當(dāng)前線程請(qǐng)求獲取該信號(hào)量的一個(gè)許可證。若有可用許可證,則獲得許可證并返回執(zhí)行同步代碼,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則阻塞等待直到有許可被釋放,或線程中斷。注意: 若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException ,并清除當(dāng)前線程的中斷狀態(tài)。 |
void | acquire(int permits) throws InterruptedException | 當(dāng)前線程請(qǐng)求獲取該信號(hào)量的permits 個(gè)許可證。若有可用數(shù)量的許可證,則獲得許可證并返回執(zhí)行同步代碼,同時(shí)可用許可證數(shù)量將減少permits 個(gè);若沒(méi)有可用數(shù)量的許可證,則阻塞等待直到可用許可達(dá)到指定數(shù)量,或線程中斷。**注意: **若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException ,并清除當(dāng)前線程的中斷狀態(tài)。 |
void | release() | 釋放該信號(hào)量的一個(gè)許可證,并使可用許可證數(shù)量增加一個(gè)。注意: 沒(méi)有通過(guò)acquire() 獲取許可的線程甚至也可以直接調(diào)用release() 來(lái)為信號(hào)量增加許可證數(shù)量,并且可用許可有可能會(huì)超出構(gòu)造時(shí)限制的permits 值,因此信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。 |
void | release(int permits) | 釋放該信號(hào)量的permits 個(gè)許可證,并使可用許可證數(shù)量增加permits 個(gè)。注意: 同release() 方法,信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。 |
boolean | tryAcquire() | 嘗試獲取該信號(hào)量的一個(gè)許可證,但該方法會(huì)立即返回。若有可用許可證,則獲得許可證并返回true ,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則返回false 。注意: 該方法會(huì)破壞公平策略,對(duì)該方法的調(diào)用會(huì)進(jìn)行搶占式獲取(不管是否有線程在等待)。 |
boolean | tryAcquire(long timeout, TimeUnit unit) throws InterruptedException | 嘗試在指定時(shí)間timeout 內(nèi)獲取該信號(hào)量的一個(gè)許可證(遵循公平策略)。若有可用許可證,則獲得許可證并返回true ,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則阻塞等待直到timeout 過(guò)期,等待時(shí)間過(guò)期則返回false 。注意: 若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException ,并清除當(dāng)前線程的中斷狀態(tài)。 |
int | availablePermits() | 獲取當(dāng)前信號(hào)量中的可用許可證數(shù)量。 |
可以看出,許可證是Semaphore
的核心概念,Semaphore
信號(hào)量對(duì)許可證的獲取是強(qiáng)限制,但對(duì)許可證的釋放是弱限制的,即請(qǐng)求線程在執(zhí)行時(shí)必須獲取acquire
到指定數(shù)量的許可證,但在釋放release
時(shí)并不會(huì)對(duì)先前是否獲取進(jìn)行檢查,因此可用許可有時(shí)可能會(huì)超出構(gòu)造時(shí)限制的permits
值。換句話說(shuō),構(gòu)造時(shí)傳入的permits
參數(shù)只表示信號(hào)量的初始許可數(shù)量,并且許可證只決定了線程執(zhí)行的門(mén)檻,但并不會(huì)對(duì)線程作全程限制;當(dāng)前線程一旦獲取到指定數(shù)量的許可便開(kāi)始執(zhí)行,即使中途釋放許可也不會(huì)影響后續(xù)執(zhí)行過(guò)程,這也就是為什么說(shuō)信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。舉例如下:
public class test {public static void main(String[] args) {// 初始化許可數(shù)量 = 5Semaphore semaphore = new Semaphore(5);new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3); // acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release(); // release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可...");} catch (InterruptedException e) {e.printStackTrace();}},"thread-1").start();try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3);// acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release();// release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可,剩余許可 = " + semaphore.availablePermits());} catch (InterruptedException e) {e.printStackTrace();}},"thread-2").start();}
}
thread-1: 等待許可...
thread-1: 拿到許可,剩余許可 = 2
thread-2: 等待許可...
thread-1: 釋放許可...
thread-2: 拿到許可,剩余許可 = 0
thread-2: 釋放許可,剩余許可 = 1
前文說(shuō)過(guò),Semaphore
通常用于實(shí)現(xiàn)資源有明確訪問(wèn)數(shù)量限制的場(chǎng)景,比如限流、池化等;此處通過(guò)Semaphore
模擬一個(gè)請(qǐng)求限流的場(chǎng)景,其中限制最大并發(fā)數(shù)為3,實(shí)現(xiàn)代碼如下:
public class test {public static void main(String[] args) {// 自定義線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2*2, 8,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1024),new ThreadPoolExecutor.AbortPolicy());// 流量控制: 限制最大并發(fā)數(shù)為3final Semaphore semaphore = new Semaphore(3);// 模擬10個(gè)客戶端任務(wù)請(qǐng)求for(int index = 0;index < 10;index++){final int serial = index;threadPool.execute(() -> {try {// 請(qǐng)求獲取許可semaphore.acquire();System.out.println(Thread.currentThread().getName() + ": 請(qǐng)求成功!訪問(wèn)編號(hào) = " + serial);// 模擬IO操作Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放許可semaphore.release();}});}// 等待線程池執(zhí)行結(jié)束關(guān)閉: 不再接受新任務(wù)提交,對(duì)已經(jīng)提交了的任務(wù)不會(huì)產(chǎn)生影響threadPool.shutdown();}
}
1.2 原理分析
Semaphore
的大部分方法都是基于內(nèi)部類Sync
實(shí)現(xiàn)的,而該類又繼承了 AbstractQueuedSynchronizer
即AQS
,并且Sync
對(duì)應(yīng)的還有兩個(gè)子類 NonfairSync
(非公平模式實(shí)現(xiàn)) 和 FairSync
(公平模式實(shí)現(xiàn))。在Semaphore
中,AQS
的 state
被定義為 permits
(許可證數(shù)量),對(duì)象創(chuàng)建時(shí)傳入的參數(shù)permits
實(shí)際是在對(duì)AQS內(nèi)部的state
進(jìn)行初始化,初始化完成后state
代表著當(dāng)前信號(hào)量對(duì)象的可用許可數(shù)(state>0
)。
以非公平模式為例,當(dāng)線程調(diào)用Semaphore.acquire(arg)
請(qǐng)求獲取許可時(shí),會(huì)首先判斷remaining = getState() - arg
是否大于0,如果是則代表還有滿足可用的許可數(shù),并嘗試對(duì)state
進(jìn)行CAS操作使state=remaining
,若CAS成功則代表獲取許可成功;否則線程需要封裝成Node節(jié)點(diǎn)并加入同步隊(duì)列阻塞等待,直到許可釋放被喚醒。
// Semaphore類 -> acquire()方法
public void acquire() throws InterruptedException {// Sync類繼承AQS,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法sync.acquireSharedInterruptibly(1);}// AbstractQueuedSynchronizer類 -> acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 判斷是否出現(xiàn)線程中斷信號(hào)(標(biāo)志)if (Thread.interrupted())throw new InterruptedException();// 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0,則線程獲取同步狀態(tài)成功if (tryAcquireShared(arg) < 0)// 未獲取成功加入同步隊(duì)列阻塞等待doAcquireSharedInterruptibly(arg);
}
// Semaphore類 -> NofairSync內(nèi)部類 -> tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {// 調(diào)用了父類Sync中的實(shí)現(xiàn)方法return nonfairTryAcquireShared(acquires);
}// Syn類 -> nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {final int nonfairTryAcquireShared(int acquires) {// 開(kāi)啟自旋死循環(huán)for (;;) {int available = getState();int remaining = available - acquires;// 判斷信號(hào)量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
}
釋放邏輯對(duì)比獲取許可的邏輯相對(duì)來(lái)說(shuō)要簡(jiǎn)單許多,只需要更新state
值增加后調(diào)用doReleaseShared()
方法喚醒后繼節(jié)點(diǎn)線程即可;需要注意的是,而在共享模式中可能會(huì)存在多條線程同時(shí)釋放許可/鎖資源,所以在此處使用了CAS+自旋
的方式保證線程安全問(wèn)題。
2. CountDownLatch
2.1 基本概述
CountDownLatch
同樣是java.util.concurrent
包下的基于AQS
實(shí)現(xiàn)的同步工具類。類似于Semaphore
,CountDownLatch
在初始化時(shí)也會(huì)傳入一個(gè)參數(shù)count
來(lái)間接賦值給AQS
的state
,用于表示一個(gè)線程計(jì)數(shù)值;不過(guò)CountDownLatch
并沒(méi)有構(gòu)建公平模式和非公平模式(內(nèi)部Sync
沒(méi)有子類實(shí)現(xiàn)),其構(gòu)造方法如下:
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
CountDownLatch
的主要作用是等待計(jì)數(shù)值count
歸零后,喚醒所有的等待線程?;谠撎匦?#xff0c;CountDownLatch
常被用于控制多線程之間的等待與協(xié)作(多線程條件喚醒);相比join
來(lái)說(shuō),CountDownLatch
更加靈活且粒度更細(xì),join
是以線程執(zhí)行結(jié)束為條件,而CountDownLatch
是以方法的主動(dòng)調(diào)用為條件。其常用方法如下:
Type | Method | Description |
---|---|---|
void | await() throws InterruptedException | 使當(dāng)前線程阻塞等待,直到計(jì)數(shù)器count 歸零或線程被中斷。若當(dāng)前計(jì)數(shù)已為零,則此方法立即返回。注意: 若在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)。 |
void | countDown() | 使當(dāng)前計(jì)數(shù)器count 遞減。如果新計(jì)數(shù)歸零,則喚醒所有await() 等待線程;注意: 若當(dāng)前計(jì)數(shù)已為零,則無(wú)事發(fā)生。 |
long | getCount() | 獲取當(dāng)前計(jì)數(shù)器count 的值。 |
需要注意的是,CountDownLatch
是一次性的,即計(jì)數(shù)器的值count
只能在構(gòu)造方法中初始化,此外再?zèng)]有任何設(shè)置值的方法,當(dāng) CountDownLatch
使用完畢后(計(jì)數(shù)歸零)將不能重復(fù)被使用;若需要重置計(jì)數(shù)的版本,可以考慮使用CyclicBarrier
。CountDownLatch
的常用方法有兩種:
- 多等一:初始化
count=1
,多條線程await()
阻塞等待一條線程調(diào)用countDown()
喚醒所有線程。比如模擬并發(fā)安全、死鎖等; - 一等多:初始化
count=N
,一條線程await()
阻塞等待N條線程調(diào)用countDown()
歸零后喚醒。比如多接口調(diào)用的數(shù)據(jù)合并、多操作完成后的數(shù)據(jù)檢查、主服務(wù)啟動(dòng)后等待多個(gè)組件加載完畢等(注意線程間的通信與數(shù)據(jù)傳遞需結(jié)合Future
實(shí)現(xiàn));
public class test {public static void main(String[] args) {// 模擬10人拼團(tuán)活動(dòng)final CountDownLatch countDownLatch = new CountDownLatch(10);// 固定數(shù)量線程池ExecutorService threadPool = Executors.newFixedThreadPool(50);// 拼團(tuán)人員ID集合List<String> ids = new ArrayList<>();// 模擬30人開(kāi)始搶單拼團(tuán)for (int i = 0; i < 30; i++) {threadPool.execute(() -> {boolean orderSucess = false;System.out.println(Thread.currentThread().getName() + ": 請(qǐng)求拼團(tuán)...");if (countDownLatch.getCount() > 0) {synchronized (ids) {if (countDownLatch.getCount() > 0) {ids.add(Thread.currentThread().getName());System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)成功!");countDownLatch.countDown();orderSucess = true;}}}if (!orderSucess) {System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)失敗!已無(wú)名額...");}});}// 訂單生成線程new Thread(() -> {try {countDownLatch.await();System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)結(jié)束, 訂單已生成...");System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)人員id = " + ids);} catch (InterruptedException e) {e.printStackTrace();}}, "拼團(tuán)").start();// 釋放線程池threadPool.shutdown();}
}
2.2 原理分析
CountDownLatch
的底層實(shí)現(xiàn)原理也非常簡(jiǎn)單,當(dāng)線程調(diào)用 await()
的時(shí)候,如果 state
不為 0 則證明任務(wù)還沒(méi)有執(zhí)行結(jié)束,await()
就會(huì)進(jìn)入阻塞等待,其源碼如下:
// CountDownLatch -> await()
public void await() throws InterruptedException {// 調(diào)用內(nèi)部類sync的acquireSharedInterruptibly方法sync.acquireSharedInterruptibly(1);
}
// CountDownLatch -> Sync -> acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 被中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// tryAcquireShared -> 判斷是否阻塞等待if (tryAcquireShared(arg) < 0)// 自旋+阻塞(AQS實(shí)現(xiàn))doAcquireSharedInterruptibly(arg);
}
// CountDownLatch -> Sync -> tryAcquireShared
protected int tryAcquireShared(int acquires) {// 判斷當(dāng)前state是否歸零return (getState() == 0) ? 1 : -1;
}
當(dāng)線程調(diào)用 countDown()
時(shí),其實(shí)最終是調(diào)用了Sync
中重寫(xiě)的tryReleaseShared
方法,該方法以 CAS 的操作來(lái)減少 state
;若更新后state
歸零,則表示所有的計(jì)數(shù)任務(wù)線程都執(zhí)行完畢,那么在 CountDownLatch
上等待的線程就會(huì)被AQS
的doReleaseShared
方法喚醒并繼續(xù)向下執(zhí)行。
// CountDownLatch -> countDown()
public void countDown() {sync.releaseShared(1);
}
// CountDownLatch -> Sync -> AQS -> releaseShared
public final boolean releaseShared(int arg) {// 判斷遞減后計(jì)數(shù)器是否歸零if (tryReleaseShared(arg)) {// 喚醒所有等待線程(AQS實(shí)現(xiàn))doReleaseShared();return true;}return false;
}
// CountDownLatch -> Sync -> tryReleaseShared
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 獲取當(dāng)前stateint c = getState();// 若計(jì)數(shù)器歸零則返回false,其他什么也不做if (c == 0)return false;// CAS更新state遞減int nextc = c-1;if (compareAndSetState(c, nextc))// 若更新成功則判斷新計(jì)數(shù)值是否歸零return nextc == 0;}
}
3. CyclicBarrier
//