国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

靠比較好的軟件下載網(wǎng)站社交媒體營(yíng)銷(xiāo)三種方式

靠比較好的軟件下載網(wǎng)站,社交媒體營(yíng)銷(xiāo)三種方式,公司網(wǎng)站 制作,武漢剛剛突然宣布新聞Java JUC(三) AQS與同步工具詳解 一. ReentrantLock 概述 ReentrantLock 是 java.util.concurrent.locks 包下的一個(gè)同步工具類,它實(shí)現(xiàn)了 Lock 接口,提供了一種相比synchronized關(guān)鍵字更靈活的鎖機(jī)制。ReentrantLock 是一種獨(dú)占…

Java JUC(三) AQS與同步工具詳解

一. ReentrantLock 概述

ReentrantLockjava.util.concurrent.locks 包下的一個(gè)同步工具類,它實(shí)現(xiàn)了 Lock 接口,提供了一種相比synchronized關(guān)鍵字更靈活的鎖機(jī)制。ReentrantLock 是一種獨(dú)占式且可重入的鎖,并且支持可中斷、公平鎖/非公平鎖、超時(shí)等待、條件變量等高級(jí)特性。其特點(diǎn)如下:

  • 獨(dú)占式: 一把鎖在同一時(shí)間只能被一個(gè)線程所獲取;
  • 可重入: 可重入意味著同一個(gè)線程如果已持有某個(gè)鎖,則可以繼續(xù)多次獲得該鎖(注意釋放同樣次之后才算完全釋放成功);
  • 可中斷: 在線程獲取鎖的等待過(guò)程中可以中斷獲取,放棄等待而轉(zhuǎn)去執(zhí)行其他邏輯;
  • 公平性: ReentrantLock支持公平鎖和非公平鎖(默認(rèn))兩種模式。其中,公平鎖會(huì)按照線程請(qǐng)求鎖的順序來(lái)分配鎖(降低性能),而非公平鎖允許線程搶占已等待的線程的鎖(可能存在饑餓現(xiàn)象);
  • 條件變量: 通過(guò) Condition接口的實(shí)現(xiàn),允許線程在某些條件下等待或喚醒,即可以實(shí)現(xiàn)選擇性通知;
TypeMethodDescription
/ReentrantLock()無(wú)參構(gòu)造方法,默認(rèn)為非公平鎖
/ReentrantLock(boolean fair)帶參構(gòu)造方法,其中fair表示鎖的公平性策略:
- true: 公平鎖
- false: 非公平鎖
voidlock()不可中斷式獲取鎖。若當(dāng)前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖過(guò)程不可被中斷
voidlockInterruptibly() throws InterruptedException可中斷式獲取鎖。若當(dāng)前鎖已被其他線程持有,則阻塞等待;**注意:**該獲鎖等待過(guò)程可被中斷,拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)
booleantryLock()嘗試獲取鎖,該方法會(huì)立即返回。若獲鎖成功,則返回true,否則將返回false;**注意:**該方法會(huì)破壞公平鎖配置,即在公平鎖策略下,該方法也會(huì)立即嘗試獲取可用鎖
booleantryLock(long timeout,TimeUnit unit) throws InterruptedException在給定時(shí)間timeout內(nèi)嘗試獲取鎖。若獲鎖成功,則返回true,否則將阻塞等待直到timeout過(guò)期,返回false注意:
- 獲鎖等待過(guò)程可被中斷,拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)
- 遵循公平鎖配置策略,即在公平鎖策略下,該方法會(huì)按順序等待獲取鎖
voidunlock()當(dāng)前線程嘗試釋放該鎖。若當(dāng)前線程未持有該鎖,則拋出IllegalMonitorStateException異常
ConditionnewCondition()返回一個(gè)與當(dāng)前Lock實(shí)例綁定的條件變量集合對(duì)象(默認(rèn)返回AQS內(nèi)部實(shí)現(xiàn)類ConditionObject),用于實(shí)現(xiàn)線程的條件等待/喚醒(詳見(jiàn)后文)

1. 鎖的基本使用

相比synchronized關(guān)鍵字來(lái)說(shuō),ReentrantLock屬于顯式鎖,其鎖機(jī)制都是針對(duì)Lock實(shí)例對(duì)象本身進(jìn)行加鎖,并且在使用過(guò)程中需要手動(dòng)釋放,即鎖的獲取與釋放是成對(duì)出現(xiàn)的;除此之外,ReentrantLock屬于JDK API層面實(shí)現(xiàn)的互斥鎖,其通過(guò)方法調(diào)用實(shí)現(xiàn)鎖功能,可以跨方法從而更加靈活。為了避免出現(xiàn)死鎖問(wèn)題,官方建議的開(kāi)發(fā)方式如下:

// new lock object
ReentrantLock lock = new ReentrantLock();
lock.lock();  // block until condition holds
try {// ... method body
} finally {lock.unlock(); // release
}

問(wèn)題背景: 假設(shè)當(dāng)前有一個(gè)賣(mài)票系統(tǒng),一共有100張票,有4個(gè)窗口同時(shí)售賣(mài),請(qǐng)模擬該賣(mài)票過(guò)程,注意保證出票的正確性。

public class Test_01 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();for (int i = 0; i < 4; i++){new Thread(new TicketSeller(lock), "Thread_" + i).start();}}
}
// 售票類實(shí)現(xiàn)
class TicketSeller implements Runnable{private static int ticketCount = 100;private ReentrantLock windowLock;public TicketSeller(ReentrantLock lock) {this.windowLock = lock;}@Overridepublic void run() {while (true) {try {Thread.sleep(10);}catch(InterruptedException e){e.printStackTrace();}windowLock.lock();try{if(ticketCount > 0){System.out.println(Thread.currentThread().getName() + ": sale and left " + --ticketCount);}else{System.out.println(Thread.currentThread().getName() + ": sold out...");break;}}finally {windowLock.unlock();}}}
}

2. 條件變量機(jī)制

synchronized關(guān)鍵字中,我們可以通過(guò)wait&notify實(shí)現(xiàn)線程的等待與喚醒,但在此場(chǎng)景下存在虛假喚醒問(wèn)題,根本原因就是其等待/喚醒機(jī)制只支持單條件(等待線程未作區(qū)分,只能全部喚醒)。相比之下,ReentrantLock基于Condition接口也實(shí)現(xiàn)了相同的機(jī)制,并提供了更細(xì)的粒度和更高級(jí)的功能;每個(gè)Condition實(shí)例都對(duì)應(yīng)一個(gè)條件隊(duì)列,用于維護(hù)在該條件場(chǎng)景下等待通知的線程,并且ReentrantLock支持多條件變量,即一個(gè)ReentrantLock可以關(guān)聯(lián)多個(gè)Condition實(shí)例。其常用方法如下:

TypeMethodDescription
voidawait() throws InterruptedException使當(dāng)前線程阻塞等待(進(jìn)入該條件隊(duì)列),并釋放與此條件變量所關(guān)聯(lián)的鎖。注意: 若在等待期間被中斷,則拋出InterruptedException,并清除當(dāng)前線程中斷狀態(tài)
booleanawait(long time,TimeUnit unit) throws InterruptedException使當(dāng)前線程阻塞等待,并釋放與此條件變量所關(guān)聯(lián)的鎖,直到被喚醒、被中斷或等待time時(shí)間過(guò)期。其返回值表示:
- true: 在等待時(shí)間之內(nèi),條件被喚醒;
- false: 等待時(shí)間過(guò)期,條件未被喚醒;
注意: 若在等待期間被中斷,則拋出InterruptedException,并清除當(dāng)前線程中斷狀態(tài)
voidsignal()喚醒一個(gè)等待在Condition上的線程。如果有多個(gè)線程在此條件變量下等待,則選擇任意一個(gè)線程喚醒;注意: 從等待方法返回前必須重新獲得Condition相關(guān)聯(lián)的鎖
voidsignalAll()喚醒所有等待在Condition上的線程。如果有多個(gè)線程在此條件變量下等待,則全部喚醒 ;注意: 從等待方法返回前必須重新獲得Condition相關(guān)聯(lián)的鎖

在使用時(shí)需要注意

  • 調(diào)用await相關(guān)方法前需要先獲得對(duì)應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會(huì)拋出IllegalMonitorStateException異常;
  • 調(diào)用signal相關(guān)方法前需要先獲得對(duì)應(yīng)條件變量所關(guān)聯(lián)的鎖,否則會(huì)拋出IllegalMonitorStateException異常;
  • await線程被喚醒(或等待時(shí)間過(guò)期、被中斷)后會(huì)重新參與鎖的競(jìng)爭(zhēng),若成功拿到鎖則將從await處恢復(fù)繼續(xù)向下執(zhí)行;
/*** 場(chǎng)景模擬:奶茶店和咖啡店共用一個(gè)窗口(window)出餐,等待顧客點(diǎn)單...*  - 奶茶店(teaWithMilk):顧客需要奶茶,則奶茶店開(kāi)始工作;*  - 咖啡店(coffee):顧客需要咖啡,則咖啡店開(kāi)始工作;*/
public class Test {// 窗口鎖(ReentrantLock實(shí)現(xiàn))static final ReentrantLock window = new ReentrantLock();// 奶茶點(diǎn)單條件變量static Condition teaWithMilk = window.newCondition();// 咖啡點(diǎn)單條件變量static Condition coffee = window.newCondition();public static void main(String[] args) throws InterruptedException {// 奶茶店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[奶茶店] 等待接單...");try {teaWithMilk.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 接到訂單...");} finally {window.unlock();}System.out.println("[奶茶店] 開(kāi)始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[奶茶店] 工作完成...");}}}).start();Thread.sleep(1000);// 咖啡店監(jiān)控線程new Thread(new Runnable() {@Overridepublic void run() {while (true) {window.lock();try {System.out.println("[咖啡店] 等待接單...");try {coffee.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 接到訂單...");} finally {window.unlock();}System.out.println("[咖啡店] 開(kāi)始工作...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[咖啡店] 工作完成...");}}}).start();Thread.sleep(1000);// 顧客點(diǎn)單線程: mainwindow.lock();try{System.out.println("[顧客] 點(diǎn)了咖啡!!");coffee.signal(); // 喚醒咖啡條件等待線程}finally {window.unlock();}}
}

二. 從 ReentrantLock 分析 AQS 的原理

1. AQS 框架

AQS 全稱為 AbstractQueuedSynchronizer ,即抽象隊(duì)列同步器;AQSjava.util.concurrent.locks 包下的一個(gè)抽象類,其為構(gòu)建鎖和同步器提供了一系列通用模板與框架的實(shí)現(xiàn),大部分JUC包下的并發(fā)工具都是基于AQS來(lái)構(gòu)建的,比如ReentrantLock、Semaphore、CountDownLatch等。其核心源碼如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {// 同步隊(duì)列的節(jié)點(diǎn)static final class Node {...}// 指向同步隊(duì)列頭部private transient volatile Node head;// 指向同步隊(duì)列尾部private transient volatile Node tail;// 同步狀態(tài)private volatile int state;// 提供一系列并發(fā)、同步隊(duì)列的基本操作方法// 比如: 掛起、取消、節(jié)點(diǎn)插入、節(jié)點(diǎn)替換等...// 交由子類實(shí)現(xiàn)的模板方法(鉤子方法): 自定義同步器的核心實(shí)現(xiàn)目標(biāo)protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}// 條件變量 Condition 接口的內(nèi)部實(shí)現(xiàn)類public class ConditionObject implements Condition {...}}

由上可知,AQS內(nèi)部實(shí)現(xiàn)了一個(gè)核心內(nèi)部類Node,該內(nèi)部類表示對(duì)等待獲取鎖的線程的封裝節(jié)點(diǎn);在AQS中,基于Node維護(hù)了一個(gè)雙向鏈表(模擬同步隊(duì)列),其中head節(jié)點(diǎn)指向同步隊(duì)列的頭部,而tail節(jié)點(diǎn)指向同步隊(duì)列的尾部。Node類的核心源碼如下:

	static final class Node {// 共享模式(共享鎖、比如Semaphore)static final Node SHARED = new Node();// 獨(dú)占模式(獨(dú)占鎖、比如ReentrantLock)static final Node EXCLUSIVE = null;// 標(biāo)識(shí)節(jié)點(diǎn)線程獲取鎖的請(qǐng)求已取消、已結(jié)束static final int CANCELLED =  1;// 標(biāo)識(shí)節(jié)點(diǎn)線程已準(zhǔn)備就緒,等待被喚醒獲取資源static final int SIGNAL    = -1;// 標(biāo)識(shí)節(jié)點(diǎn)線程在條件變量Condition中等待static final int CONDITION = -2;// 在共享模式下啟用: 標(biāo)識(shí)獲得的同步狀態(tài)會(huì)被傳播static final int PROPAGATE = -3;/*** waitStatus 標(biāo)識(shí)節(jié)點(diǎn)線程在同步隊(duì)列中的狀態(tài),共存在以下幾種情況:* (1)SIGNAL: 被標(biāo)記為SIGNAL的節(jié)點(diǎn)處于等待喚醒獲取資源的狀態(tài),只要前驅(qū)節(jié)點(diǎn)釋放鎖就會(huì)通知該狀態(tài)的后續(xù)節(jié)點(diǎn)線程執(zhí)行* (2)CANCELLED: 在同步隊(duì)列中等待超時(shí)、被中斷的線程會(huì)進(jìn)入取消狀態(tài),不再響應(yīng)并會(huì)在遍歷過(guò)程中被移除* (3)CONDITION: 標(biāo)識(shí)當(dāng)前節(jié)點(diǎn)線程在Condition下等待,被喚醒后將重新從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列* (4)PROPAGATE: 與共享模式有關(guān) * (5)0: 默認(rèn)初始值狀態(tài),代表節(jié)點(diǎn)初始化*/volatile int waitStatus;// 同步隊(duì)列中的前驅(qū)節(jié)點(diǎn)volatile Node prev;// 同步隊(duì)列中的后繼節(jié)點(diǎn)volatile Node next;// 等待獲取鎖資源的線程volatile Thread thread;// Condition 等待隊(duì)列中的后繼節(jié)點(diǎn)(單向鏈表)Node nextWaiter;// 判斷是否為共享模式final boolean isShared() {return nextWaiter == SHARED;}// 獲取當(dāng)前節(jié)點(diǎn)在同步隊(duì)列中的前驅(qū)節(jié)點(diǎn)final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}// 省略構(gòu)造方法...}

在了解Node的基本數(shù)據(jù)結(jié)構(gòu)與狀態(tài)之后,AQS還有一個(gè)核心狀態(tài)變量即state,該全局變量用于表示同步鎖的狀態(tài),其具體含義一般在子類實(shí)現(xiàn)中進(jìn)行定義和維護(hù)(獨(dú)占鎖和共享鎖不一樣)。但不管獨(dú)占模式還是共享模式,簡(jiǎn)單來(lái)說(shuō)都是使用一個(gè)volatile的全局變量來(lái)表示資源同步狀態(tài)(state),并通過(guò)CAS完成對(duì)state值的修改(修改成功則表示獲鎖成功),當(dāng)持有鎖的線程數(shù)量超過(guò)當(dāng)前模式(獨(dú)占模式一般限制為1)時(shí),則通過(guò)內(nèi)置的FIFO同步隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作(通過(guò)LockSupport park/unpark方法實(shí)現(xiàn)掛起與喚醒)。其核心原理圖如下:

在這里插入圖片描述

綜上,AQS采用了模板方法模式來(lái)構(gòu)建同步框架,并提供了一系列并發(fā)操作的公共基礎(chǔ)方法,支持共享模式和獨(dú)占模式兩種實(shí)現(xiàn);但AQS并不負(fù)責(zé)對(duì)外提供具體的加鎖/解鎖邏輯,因?yàn)殒i是千變?nèi)f化的,AQS只關(guān)注基礎(chǔ)組件、頂層模板這些總的概念,具體的鎖邏輯將通過(guò)”鉤子“的方式下放給子類實(shí)現(xiàn)。也就是說(shuō),獨(dú)占模式只需要實(shí)現(xiàn)tryAcquire-tryRelease方法、共享模式只需要實(shí)現(xiàn)tryAcquireShared-tryReleaseShared方法,搭配AQS提供的框架和基礎(chǔ)組件就能輕松實(shí)現(xiàn)自定義的同步工具。

2. ReentrantLock 源碼分析

在這里插入圖片描述

ReentrantLock的類結(jié)構(gòu)圖可以看出,ReentrantLock實(shí)現(xiàn)了Lock接口,其內(nèi)部包含一個(gè)內(nèi)部類Sync,該內(nèi)部類繼承了AQSAbstractQueuedSynchronizer),ReentrantLock大部分的鎖操作都是通過(guò)Sync實(shí)現(xiàn)的。除此之外,ReentrantLock有公平鎖和非公平鎖兩種模式,分別對(duì)應(yīng)SyncFairSyncNonfairSync兩個(gè)子類實(shí)現(xiàn)。

public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;// 默認(rèn)構(gòu)造函數(shù): 默認(rèn)創(chuàng)建非公平鎖public ReentrantLock() {sync = new NonfairSync();}// 帶參構(gòu)造函數(shù): true公平鎖/false非公平鎖public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}// 加鎖操作public void lock() {sync.lock();}//...
}

接下來(lái),本節(jié)將首先以ReentrantLock的非公平鎖為例進(jìn)行分析,然后再介紹公平鎖與非公平鎖的主要區(qū)別。

2.1 非公平鎖lock加鎖原理

非公平鎖NonfairSync的源碼如下:

    static final class NonfairSync extends Sync {// 加鎖操作final void lock() {// CAS修改state狀態(tài)以獲取鎖資源if (compareAndSetState(0, 1))// 成功則將獨(dú)占鎖線程設(shè)置為當(dāng)前線程setExclusiveOwnerThread(Thread.currentThread());else// 否則再次請(qǐng)求同步狀態(tài)(AQS的模板方法)acquire(1);}// AQS 獲鎖的鉤子方法實(shí)現(xiàn)protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}

在調(diào)用lock方法獲取鎖的過(guò)程中,當(dāng)前線程會(huì)先通過(guò)CAS操作嘗試修改state0(表示無(wú)鎖)到1(表示占有鎖),若修改成功則將AQS中保存的獨(dú)占線程exclusiveOwnerThread修改為當(dāng)前線程;若失敗則執(zhí)行acquire(1)方法,該方法是AQS中的一個(gè)模板方法,其源碼如下:

    public final void acquire(int arg) {// tryAcquire -> addWaiter -> acquireQueuedif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

可以看到該方法首先調(diào)用了鉤子方法tryAcquire,該方法是交由子類NonfairSync實(shí)現(xiàn)的,在上面的源碼中我們已經(jīng)給出了tryAcquire的實(shí)現(xiàn)代碼,其直接調(diào)用了父類Sync中的nonfairTryAcquire方法,其源碼如下:

abstract static class Sync extends AbstractQueuedSynchronizer {abstract void lock();// nonfairTryAcquire 方法實(shí)現(xiàn)final boolean nonfairTryAcquire(int acquires) {// 獲取當(dāng)前線程及同步隊(duì)列狀態(tài)statefinal Thread current = Thread.currentThread();int c = getState();// 若狀態(tài)為0表示鎖已釋放: 重新嘗試獲取鎖if (c == 0) {// CAS嘗試修改state的值if (compareAndSetState(0, acquires)) {// 若成功則設(shè)置獨(dú)占線程為當(dāng)前線程setExclusiveOwnerThread(current);return true;}}// 若獨(dú)占線程即當(dāng)前線程,則屬于重入鎖else if (current == getExclusiveOwnerThread()) {// 修改state為重入值int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}//省略代碼...
}

由上述代碼可知,該方法首先再次判斷鎖是否已釋放,這是為了避免之前持鎖的線程在這段時(shí)間內(nèi)又重新釋放了鎖,若state==0則會(huì)嘗試再次CAS修改同步狀態(tài)以獲取鎖資源;否則的話,則判斷當(dāng)前線程是否是鎖重入的情況,若兩個(gè)判斷都不滿足則返回false;到目前為止,我們回過(guò)頭來(lái)想一下非公平鎖的非公平性是在哪體現(xiàn)的?

很明顯,在上述代碼分析中,當(dāng)有任何線程嘗試獲取鎖時(shí)(調(diào)用lock方法),不論當(dāng)前同步隊(duì)列中是否已有線程排隊(duì)等待,NonfairSynclock()方法以及SyncnonfairTryAcquire()方法都沒(méi)有對(duì)同步隊(duì)列中的等待情況進(jìn)行判斷,而是直接通過(guò)CAS嘗試修改state的值來(lái)為當(dāng)前線程直接占有鎖;這就是非公平性的體現(xiàn),搶占線程可以直接與等待線程競(jìng)爭(zhēng)鎖資源,而不用按照順序加入隊(duì)列。

分析完這部分之后,我們?cè)倩氐?code>acquire(1)方法,若tryAcquire(arg)方法返回false即獲取不到鎖時(shí)會(huì)繼續(xù)向下執(zhí)行到addWaiter(Node.EXCLUSIVE)方法,該方法用于封裝線程入隊(duì),其源碼如下:

    private Node addWaiter(Node mode) {// 將請(qǐng)求占鎖失敗的線程封裝為Node節(jié)點(diǎn)Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 若同步隊(duì)列不為空,則嘗試CAS在尾部插入當(dāng)前節(jié)點(diǎn)(FIFO)if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若隊(duì)列為空或CAS插入節(jié)點(diǎn)失敗則執(zhí)行enq()方法處理入隊(duì)enq(node);return node;}

接著,我們繼續(xù)分析enq(node)方法的實(shí)現(xiàn):

    private Node enq(final Node node) {// 開(kāi)啟自旋(循環(huán))for (;;) {Node t = tail;// 若隊(duì)列為空if (t == null) { // Must initialize// 則嘗試CAS創(chuàng)建頭節(jié)點(diǎn)(不存儲(chǔ)數(shù)據(jù))// 原因: 隊(duì)列為空可能是因?yàn)槠渌€程非公平占有了鎖(當(dāng)前線程試過(guò)沒(méi)搶到),因此這里需要先斬后奏,即再次創(chuàng)建頭節(jié)點(diǎn)表示已占鎖線程的占位,來(lái)維護(hù)同步隊(duì)列if (compareAndSetHead(new Node()))tail = head;} else {// 否則嘗試CAS添加尾節(jié)點(diǎn)node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}// 若CAS失敗(可能有多線程并發(fā)操作),則不斷自旋重試直到插入成功}

可以看到頭節(jié)點(diǎn)head其實(shí)是不存儲(chǔ)數(shù)據(jù)的,它只表示一個(gè)線程占位(占位了鎖資源),因?yàn)槲挥陬^節(jié)點(diǎn)的線程肯定已經(jīng)獲取到了鎖,頭節(jié)點(diǎn)只存儲(chǔ)后繼節(jié)點(diǎn)指向,用于當(dāng)前線程釋放鎖資源時(shí)喚醒后繼節(jié)點(diǎn);那么到此這個(gè)方法也就分析完成了,在節(jié)點(diǎn)入隊(duì)成功之后會(huì)返回當(dāng)前節(jié)點(diǎn)node,然后會(huì)繼續(xù)執(zhí)行到acquireQueued(addWaiter(Node.EXCLUSIVE),arg)方法,其源碼如下:

    final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false; //阻塞掛起標(biāo)志// 開(kāi)啟自旋(循環(huán))for (;;) {// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)pfinal Node p = node.predecessor();// 若p是頭節(jié)點(diǎn)則當(dāng)前節(jié)點(diǎn)嘗試獲取鎖資源if (p == head && tryAcquire(arg)) {// 占鎖成功則設(shè)置當(dāng)前節(jié)點(diǎn)node為頭節(jié)點(diǎn): 頭節(jié)點(diǎn)狀態(tài)保持SIGNAL狀態(tài)setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 若p不是頭節(jié)點(diǎn)或獲取鎖資源失敗,則判斷是否阻塞掛起線程來(lái)等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {// 若最終無(wú)法獲取鎖,則取消該線程的請(qǐng)求if (failed)cancelAcquire(node);}}// 將傳遞的節(jié)點(diǎn)設(shè)置為同步隊(duì)列的頭節(jié)點(diǎn)private void setHead(Node node) {head = node;// 清空當(dāng)前節(jié)點(diǎn)存儲(chǔ)的線程信息node.thread = null;node.prev = null;}

acquireQueued方法中,線程會(huì)開(kāi)啟自旋,若發(fā)現(xiàn)(或被喚醒后發(fā)現(xiàn))當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)變?yōu)轭^節(jié)點(diǎn),則說(shuō)明當(dāng)前節(jié)點(diǎn)能夠嘗試獲取鎖資源,并嘗試通過(guò)tryAcquire方法獲取同步狀態(tài);需要注意的是,head頭節(jié)點(diǎn)表示當(dāng)前占有鎖的線程節(jié)點(diǎn),只有當(dāng)head節(jié)點(diǎn)對(duì)應(yīng)的線程釋放鎖資源并喚醒后繼節(jié)點(diǎn)時(shí),后繼節(jié)點(diǎn)線程才會(huì)自旋去嘗試占有鎖資源,因此:在同步隊(duì)列中,只有前驅(qū)節(jié)點(diǎn)變?yōu)轭^節(jié)點(diǎn)時(shí),當(dāng)前節(jié)點(diǎn)才有資格嘗試獲取鎖資源,其他時(shí)候都將被掛起等待,避免空轉(zhuǎn)CPU

除此之外,若在自旋過(guò)程中,當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者節(jié)點(diǎn)嘗試tryAcquire獲取鎖資源失敗,則會(huì)執(zhí)行shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt()邏輯;需要注意的是在前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)但嘗試獲取鎖資源失敗這種特殊情況發(fā)生時(shí)(比如非公平鎖模式下被新到來(lái)的請(qǐng)求線程搶占),頭節(jié)點(diǎn)head此時(shí)可能有兩種狀態(tài):

  • waitStatus==0: 處于該狀態(tài)一種情況是初始同步隊(duì)列為空時(shí),默認(rèn)頭節(jié)點(diǎn)狀態(tài)初始化為0;另一種情況是鎖釋放時(shí)(見(jiàn)后文)被unparkSuccessor重置頭節(jié)點(diǎn)狀態(tài);

  • waitStatus==SIGNAL: 處于該狀態(tài)一種情況是waitStatus==0時(shí)更新?tīng)顟B(tài)后又自旋回來(lái)但仍未獲取到鎖(可能釋放鎖后被非公平搶占);另一種情況是釋放鎖時(shí)unparkSuccessor重置失敗;

其源碼如下:

	// 判斷節(jié)點(diǎn)線程是否掛起等待private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 獲取前驅(qū)節(jié)點(diǎn)的等待狀態(tài)int ws = pred.waitStatus;// 若是SIGNAL狀態(tài),則說(shuō)明前驅(qū)節(jié)點(diǎn)就緒,當(dāng)前節(jié)點(diǎn)正常需要繼續(xù)等待即返回trueif (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;// 若等待狀態(tài)>0則說(shuō)明前驅(qū)節(jié)點(diǎn)是結(jié)束狀態(tài),需要遍歷前驅(qū)節(jié)點(diǎn)直到找到非結(jié)束狀態(tài)的有效節(jié)點(diǎn)if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 若等待狀態(tài)<=0且非SIGNAL,則嘗試將前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL/** waitStatus must be 0 or PROPAGATE.  Indicate that we* need a signal, but don't park yet.  Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// false 則返回去繼續(xù)自旋return false;}// 執(zhí)行掛起阻塞操作 LockSupport.parkprivate final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

shouldParkAfterFailedAcquire方法的執(zhí)行邏輯是判斷前驅(qū)節(jié)點(diǎn)的等待狀態(tài)waitStatus,用于掛起當(dāng)前節(jié)點(diǎn)線程等待,結(jié)合上述分析,包括以下幾種情況:

  • 前驅(qū)節(jié)點(diǎn)狀態(tài)waitStatus==SIGNAL: 若前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),則說(shuō)明占鎖線程還未執(zhí)行結(jié)束,當(dāng)前節(jié)點(diǎn)線程仍需掛起等待;若前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),則說(shuō)明前驅(qū)節(jié)點(diǎn)就緒/擁有更高的優(yōu)先級(jí),下次執(zhí)行還輪不到當(dāng)前節(jié)點(diǎn),所以也可以安全掛起,直接返回true
  • 前驅(qū)節(jié)點(diǎn)狀態(tài)waitStatus>0: 說(shuō)明前驅(qū)節(jié)點(diǎn)已處于結(jié)束/取消狀態(tài),應(yīng)該從同步隊(duì)列中移除,并遍歷所有前驅(qū)節(jié)點(diǎn)直到找到非結(jié)束狀態(tài)的有效節(jié)點(diǎn)作為前驅(qū);
  • 前驅(qū)節(jié)點(diǎn)狀態(tài)waitStatus<0且非SIGNAL: 前驅(qū)節(jié)點(diǎn)剛從Condition的條件等待隊(duì)列被喚醒,從而轉(zhuǎn)移到同步隊(duì)列,需要轉(zhuǎn)換為SIGNAL狀態(tài)等待;
  • 前驅(qū)節(jié)點(diǎn)狀態(tài)waitStatus==0:若前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),則說(shuō)明同步隊(duì)列剛初始化(0)或鎖剛被釋放重置,鎖資源可能未被其他線程持有,需判斷能否占有鎖(不管當(dāng)前線程能否占有,該鎖一定會(huì)被占有,都需要轉(zhuǎn)換狀態(tài)為SIGNAL);若前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),則說(shuō)明該線程節(jié)點(diǎn)剛初始化并被插入隊(duì)列,需要轉(zhuǎn)換為SIGNAL狀態(tài);

綜上,當(dāng)shouldParkAfterFailedAcquire()方法返回true時(shí)會(huì)調(diào)用parkAndCheckInterrupt方法掛起線程等待被喚醒,返回false時(shí)則會(huì)繼續(xù)自旋判斷;至此,ReetrantLock內(nèi)部間接依靠AQSFIFO同步隊(duì)列,就完成了lock()加鎖操作。

2.2 公平鎖lock加鎖原理

公平鎖FairSync的源碼如下:

    static final class FairSync extends Sync {// 加鎖操作final void lock() {acquire(1);}// AQS 獲鎖的鉤子方法實(shí)現(xiàn)protected final boolean tryAcquire(int acquires) {// 獲取當(dāng)前線程final Thread current = Thread.currentThread();// 獲取同步狀態(tài)int c = getState();// 若當(dāng)前沒(méi)有線程持有鎖資源if (c == 0) {// 首先判斷同步隊(duì)列是否存在等待節(jié)點(diǎn)if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 鎖重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}

與非公平鎖唯一不同的是:公平鎖的tryAcquire實(shí)現(xiàn)中,在嘗試修改state之前,會(huì)先調(diào)用hasQueuedPredecessors()判斷AQS內(nèi)部同步隊(duì)列中是否已存在等待節(jié)點(diǎn)。如果存在,則說(shuō)明在此之前,已經(jīng)有線程提交了獲取鎖的請(qǐng)求,那么當(dāng)前線程就會(huì)直接被封裝成Node節(jié)點(diǎn),追加到隊(duì)尾等待。

2.3 釋放鎖原理

ReetrantLock顯式鎖需要手動(dòng)釋放鎖資源,其unlock()方法直接調(diào)用了Sync中的release(1)方法,而該方法又是在其父類AQS中直接實(shí)現(xiàn)的,其源碼如下:

    public final boolean release(int arg) {// 嘗試釋放鎖if (tryRelease(arg)) {// 進(jìn)入該代碼塊則說(shuō)明鎖已完全釋放(state=0)// 獲取頭節(jié)點(diǎn)Node h = head;if (h != null && h.waitStatus != 0)// 喚醒head頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程unparkSuccessor(h);return true;}return false;}// 喚醒node的后繼節(jié)點(diǎn)線程private void unparkSuccessor(Node node) {// 獲取節(jié)點(diǎn)狀態(tài)int ws = node.waitStatus;if (ws < 0) // 重置節(jié)點(diǎn)狀態(tài),允許失敗compareAndSetWaitStatus(node, ws, 0);// 獲取后繼節(jié)點(diǎn)Node s = node.next;// 若后繼節(jié)點(diǎn)為空或已結(jié)束if (s == null || s.waitStatus > 0) {s = null;// 尋找后繼可被喚醒的有效等待節(jié)點(diǎn)for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 執(zhí)行線程喚醒(繼續(xù)去自旋) LockSupport.unparkif (s != null)LockSupport.unpark(s.thread);}

release方法能否釋放鎖并喚醒后繼節(jié)點(diǎn)線程依賴于tryRelease鉤子方法,而該方法又下放到了Sync中實(shí)現(xiàn),其源碼如下:

       // ReentrantLock -> Sync -> tryRelease(1)protected final boolean tryRelease(int releases) {// 計(jì)算釋放鎖后的同步更新?tīng)顟B(tài)int c = getState() - releases;// 如果當(dāng)前釋放鎖的線程不為持有鎖的線程則拋出異常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 判斷更新?tīng)顟B(tài)是否為0,如果是則說(shuō)明已完全釋放鎖if (c == 0) {free = true;// 完全釋放鎖才清空當(dāng)前占有線程setExclusiveOwnerThread(null);}// 更新state值setState(c);// 完全釋放鎖才返回truereturn free;}

注意:tryRelease方法執(zhí)行完成返回true之后,就說(shuō)明當(dāng)前線程持有的鎖已被釋放(非公平鎖中就已經(jīng)可以被搶占了),后續(xù)unparkSuccessor方法只是進(jìn)行一些善后工作,其中重置頭節(jié)點(diǎn)狀態(tài)的目的是表示邏輯上從持鎖到無(wú)鎖的轉(zhuǎn)換,鎖資源目前可能并沒(méi)有線程持有,因此在后續(xù)線程喚醒后執(zhí)行acquireQueued自旋時(shí)waitStatus==0狀態(tài)會(huì)再一次判斷并嘗試獲取鎖,而修改為SIGNAL就表示占鎖線程正在執(zhí)行,其他線程需要掛起等待。至此,整個(gè)流程可以結(jié)合起來(lái)理解:s節(jié)點(diǎn)的線程被喚醒后,會(huì)繼續(xù)執(zhí)行acquireQueued()方法中的自旋,判斷if (p == head && tryAcquire(arg))代碼是否成立,從而執(zhí)行判斷操作。

三. 其他同步工具類

1. Semaphore

1.1 基本概述

Semaphorejava.util.concurrent包下的一種計(jì)數(shù)信號(hào)量,它同樣也是基于AQS實(shí)現(xiàn)的同步工具類。相比ReentrantLock來(lái)說(shuō),它應(yīng)該屬于共享鎖,即允許多個(gè)線程同時(shí)訪問(wèn)某個(gè)共享資源,但會(huì)限制同時(shí)訪問(wèn)特定資源的線程數(shù)量;Semaphore同樣也支持公平模式和非公平模式兩種方式,其構(gòu)造方法如下:

public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Semaphore默認(rèn)為非公平模式(搶占式),在構(gòu)造信號(hào)量對(duì)象時(shí)都必須提供permits參數(shù);permits可以理解為許可證數(shù)量,只有拿到許可證的線程才能執(zhí)行,該參數(shù)限制了能同時(shí)獲取或訪問(wèn)到共享資源的線程數(shù)量,其他超出線程都將阻塞等待。基于此,Semaphore通常用于實(shí)現(xiàn)資源有明確訪問(wèn)數(shù)量限制的場(chǎng)景,比如限流、池化等。Semaphore的常用方法介紹如下:

TypeMethodDescription
voidacquire() throws InterruptedException當(dāng)前線程請(qǐng)求獲取該信號(hào)量的一個(gè)許可證。若有可用許可證,則獲得許可證并返回執(zhí)行同步代碼,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則阻塞等待直到有許可被釋放,或線程中斷。注意: 若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)。
voidacquire(int permits) throws InterruptedException當(dāng)前線程請(qǐng)求獲取該信號(hào)量的permits個(gè)許可證。若有可用數(shù)量的許可證,則獲得許可證并返回執(zhí)行同步代碼,同時(shí)可用許可證數(shù)量將減少permits個(gè);若沒(méi)有可用數(shù)量的許可證,則阻塞等待直到可用許可達(dá)到指定數(shù)量,或線程中斷。**注意: **若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)。
voidrelease()釋放該信號(hào)量的一個(gè)許可證,并使可用許可證數(shù)量增加一個(gè)。注意: 沒(méi)有通過(guò)acquire()獲取許可的線程甚至也可以直接調(diào)用release()來(lái)為信號(hào)量增加許可證數(shù)量,并且可用許可有可能會(huì)超出構(gòu)造時(shí)限制的permits值,因此信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。
voidrelease(int permits)釋放該信號(hào)量的permits個(gè)許可證,并使可用許可證數(shù)量增加permits個(gè)。注意:release()方法,信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。
booleantryAcquire()嘗試獲取該信號(hào)量的一個(gè)許可證,但該方法會(huì)立即返回。若有可用許可證,則獲得許可證并返回true,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則返回false。注意: 該方法會(huì)破壞公平策略,對(duì)該方法的調(diào)用會(huì)進(jìn)行搶占式獲取(不管是否有線程在等待)。
booleantryAcquire(long timeout, TimeUnit unit) throws InterruptedException嘗試在指定時(shí)間timeout內(nèi)獲取該信號(hào)量的一個(gè)許可證(遵循公平策略)。若有可用許可證,則獲得許可證并返回true,同時(shí)可用許可證數(shù)量將減少一個(gè);若沒(méi)有可用許可證,則阻塞等待直到timeout過(guò)期,等待時(shí)間過(guò)期則返回false。注意: 若當(dāng)前線程在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)。
intavailablePermits()獲取當(dāng)前信號(hào)量中的可用許可證數(shù)量。

可以看出,許可證是Semaphore的核心概念,Semaphore信號(hào)量對(duì)許可證的獲取是強(qiáng)限制,但對(duì)許可證的釋放是弱限制的,即請(qǐng)求線程在執(zhí)行時(shí)必須獲取acquire到指定數(shù)量的許可證,但在釋放release時(shí)并不會(huì)對(duì)先前是否獲取進(jìn)行檢查,因此可用許可有時(shí)可能會(huì)超出構(gòu)造時(shí)限制的permits值。換句話說(shuō),構(gòu)造時(shí)傳入的permits參數(shù)只表示信號(hào)量的初始許可數(shù)量,并且許可證只決定了線程執(zhí)行的門(mén)檻,但并不會(huì)對(duì)線程作全程限制;當(dāng)前線程一旦獲取到指定數(shù)量的許可便開(kāi)始執(zhí)行,即使中途釋放許可也不會(huì)影響后續(xù)執(zhí)行過(guò)程,這也就是為什么說(shuō)信號(hào)量的正確使用必須是通過(guò)應(yīng)用程序中的編程約束來(lái)建立。舉例如下:

public class test {public static void main(String[] args) {// 初始化許可數(shù)量 = 5Semaphore semaphore = new Semaphore(5);new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3); // acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release(); // release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可...");} catch (InterruptedException e) {e.printStackTrace();}},"thread-1").start();try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + ": 等待許可...");semaphore.acquire(3);// acquire permits = 3System.out.println(Thread.currentThread().getName() + ": 拿到許可,剩余許可 = " + semaphore.availablePermits());Thread.sleep(3000);semaphore.release();// release = 1System.out.println(Thread.currentThread().getName() + ": 釋放許可,剩余許可 = " + semaphore.availablePermits());} catch (InterruptedException e) {e.printStackTrace();}},"thread-2").start();}
}
thread-1: 等待許可...
thread-1: 拿到許可,剩余許可 = 2
thread-2: 等待許可...
thread-1: 釋放許可...
thread-2: 拿到許可,剩余許可 = 0
thread-2: 釋放許可,剩余許可 = 1

前文說(shuō)過(guò),Semaphore通常用于實(shí)現(xiàn)資源有明確訪問(wèn)數(shù)量限制的場(chǎng)景,比如限流、池化等;此處通過(guò)Semaphore模擬一個(gè)請(qǐng)求限流的場(chǎng)景,其中限制最大并發(fā)數(shù)為3,實(shí)現(xiàn)代碼如下:

public class test {public static void main(String[] args) {// 自定義線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2*2, 8,60, TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1024),new ThreadPoolExecutor.AbortPolicy());// 流量控制: 限制最大并發(fā)數(shù)為3final Semaphore semaphore = new Semaphore(3);// 模擬10個(gè)客戶端任務(wù)請(qǐng)求for(int index = 0;index < 10;index++){final int serial = index;threadPool.execute(() -> {try {// 請(qǐng)求獲取許可semaphore.acquire();System.out.println(Thread.currentThread().getName() + ": 請(qǐng)求成功!訪問(wèn)編號(hào) = " + serial);// 模擬IO操作Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放許可semaphore.release();}});}// 等待線程池執(zhí)行結(jié)束關(guān)閉: 不再接受新任務(wù)提交,對(duì)已經(jīng)提交了的任務(wù)不會(huì)產(chǎn)生影響threadPool.shutdown();}
}

1.2 原理分析

Semaphore的大部分方法都是基于內(nèi)部類Sync實(shí)現(xiàn)的,而該類又繼承了 AbstractQueuedSynchronizerAQS,并且Sync對(duì)應(yīng)的還有兩個(gè)子類 NonfairSync(非公平模式實(shí)現(xiàn)) 和 FairSync(公平模式實(shí)現(xiàn))。在Semaphore中,AQSstate 被定義為 permits(許可證數(shù)量),對(duì)象創(chuàng)建時(shí)傳入的參數(shù)permits實(shí)際是在對(duì)AQS內(nèi)部的state進(jìn)行初始化,初始化完成后state代表著當(dāng)前信號(hào)量對(duì)象的可用許可數(shù)(state>0)。

以非公平模式為例,當(dāng)線程調(diào)用Semaphore.acquire(arg)請(qǐng)求獲取許可時(shí),會(huì)首先判斷remaining = getState() - arg是否大于0,如果是則代表還有滿足可用的許可數(shù),并嘗試對(duì)state進(jìn)行CAS操作使state=remaining,若CAS成功則代表獲取許可成功;否則線程需要封裝成Node節(jié)點(diǎn)并加入同步隊(duì)列阻塞等待,直到許可釋放被喚醒。

// Semaphore類 -> acquire()方法
public void acquire() throws InterruptedException {// Sync類繼承AQS,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法sync.acquireSharedInterruptibly(1);}// AbstractQueuedSynchronizer類 -> acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 判斷是否出現(xiàn)線程中斷信號(hào)(標(biāo)志)if (Thread.interrupted())throw new InterruptedException();// 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0,則線程獲取同步狀態(tài)成功if (tryAcquireShared(arg) < 0)// 未獲取成功加入同步隊(duì)列阻塞等待doAcquireSharedInterruptibly(arg);
}
// Semaphore類 -> NofairSync內(nèi)部類 -> tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {// 調(diào)用了父類Sync中的實(shí)現(xiàn)方法return nonfairTryAcquireShared(acquires);
}// Syn類 -> nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {final int nonfairTryAcquireShared(int acquires) {// 開(kāi)啟自旋死循環(huán)for (;;) {int available = getState();int remaining = available - acquires;// 判斷信號(hào)量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
}

釋放邏輯對(duì)比獲取許可的邏輯相對(duì)來(lái)說(shuō)要簡(jiǎn)單許多,只需要更新state值增加后調(diào)用doReleaseShared()方法喚醒后繼節(jié)點(diǎn)線程即可;需要注意的是,而在共享模式中可能會(huì)存在多條線程同時(shí)釋放許可/鎖資源,所以在此處使用了CAS+自旋的方式保證線程安全問(wèn)題。

2. CountDownLatch

2.1 基本概述

CountDownLatch同樣是java.util.concurrent包下的基于AQS實(shí)現(xiàn)的同步工具類。類似于SemaphoreCountDownLatch在初始化時(shí)也會(huì)傳入一個(gè)參數(shù)count來(lái)間接賦值給AQSstate,用于表示一個(gè)線程計(jì)數(shù)值;不過(guò)CountDownLatch并沒(méi)有構(gòu)建公平模式和非公平模式(內(nèi)部Sync沒(méi)有子類實(shí)現(xiàn)),其構(gòu)造方法如下:

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}

CountDownLatch的主要作用是等待計(jì)數(shù)值count歸零后,喚醒所有的等待線程?;谠撎匦?#xff0c;CountDownLatch常被用于控制多線程之間的等待與協(xié)作(多線程條件喚醒);相比join來(lái)說(shuō),CountDownLatch更加靈活且粒度更細(xì),join是以線程執(zhí)行結(jié)束為條件,而CountDownLatch是以方法的主動(dòng)調(diào)用為條件。其常用方法如下:

TypeMethodDescription
voidawait() throws InterruptedException使當(dāng)前線程阻塞等待,直到計(jì)數(shù)器count歸零或線程被中斷。若當(dāng)前計(jì)數(shù)已為零,則此方法立即返回。注意: 若在等待過(guò)程中被中斷,則會(huì)拋出InterruptedException,并清除當(dāng)前線程的中斷狀態(tài)。
voidcountDown()使當(dāng)前計(jì)數(shù)器count遞減。如果新計(jì)數(shù)歸零,則喚醒所有await()等待線程;注意: 若當(dāng)前計(jì)數(shù)已為零,則無(wú)事發(fā)生。
longgetCount()獲取當(dāng)前計(jì)數(shù)器count的值。

需要注意的是CountDownLatch一次性的,即計(jì)數(shù)器的值count只能在構(gòu)造方法中初始化,此外再?zèng)]有任何設(shè)置值的方法,當(dāng) CountDownLatch 使用完畢后(計(jì)數(shù)歸零)將不能重復(fù)被使用;若需要重置計(jì)數(shù)的版本,可以考慮使用CyclicBarrier。CountDownLatch 的常用方法有兩種:

  • 多等一:初始化count=1多條線程await()阻塞等待一條線程調(diào)用countDown()喚醒所有線程。比如模擬并發(fā)安全、死鎖等;
  • 一等多:初始化count=N一條線程await()阻塞等待N條線程調(diào)用countDown()歸零后喚醒。比如多接口調(diào)用的數(shù)據(jù)合并、多操作完成后的數(shù)據(jù)檢查、主服務(wù)啟動(dòng)后等待多個(gè)組件加載完畢等(注意線程間的通信與數(shù)據(jù)傳遞需結(jié)合Future實(shí)現(xiàn));
public class test {public static void main(String[] args) {// 模擬10人拼團(tuán)活動(dòng)final CountDownLatch countDownLatch = new CountDownLatch(10);// 固定數(shù)量線程池ExecutorService threadPool = Executors.newFixedThreadPool(50);// 拼團(tuán)人員ID集合List<String> ids = new ArrayList<>();// 模擬30人開(kāi)始搶單拼團(tuán)for (int i = 0; i < 30; i++) {threadPool.execute(() -> {boolean orderSucess = false;System.out.println(Thread.currentThread().getName() + ": 請(qǐng)求拼團(tuán)...");if (countDownLatch.getCount() > 0) {synchronized (ids) {if (countDownLatch.getCount() > 0) {ids.add(Thread.currentThread().getName());System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)成功!");countDownLatch.countDown();orderSucess = true;}}}if (!orderSucess) {System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)失敗!已無(wú)名額...");}});}// 訂單生成線程new Thread(() -> {try {countDownLatch.await();System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)結(jié)束, 訂單已生成...");System.out.println(Thread.currentThread().getName() + ": 拼團(tuán)人員id = " + ids);} catch (InterruptedException e) {e.printStackTrace();}}, "拼團(tuán)").start();// 釋放線程池threadPool.shutdown();}
}

2.2 原理分析

CountDownLatch的底層實(shí)現(xiàn)原理也非常簡(jiǎn)單,當(dāng)線程調(diào)用 await() 的時(shí)候,如果 state 不為 0 則證明任務(wù)還沒(méi)有執(zhí)行結(jié)束,await() 就會(huì)進(jìn)入阻塞等待,其源碼如下:

// CountDownLatch -> await()
public void await() throws InterruptedException {// 調(diào)用內(nèi)部類sync的acquireSharedInterruptibly方法sync.acquireSharedInterruptibly(1);
}
// CountDownLatch -> Sync -> acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 被中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// tryAcquireShared -> 判斷是否阻塞等待if (tryAcquireShared(arg) < 0)// 自旋+阻塞(AQS實(shí)現(xiàn))doAcquireSharedInterruptibly(arg);
}
// CountDownLatch -> Sync -> tryAcquireShared
protected int tryAcquireShared(int acquires) {// 判斷當(dāng)前state是否歸零return (getState() == 0) ? 1 : -1;
}

當(dāng)線程調(diào)用 countDown() 時(shí),其實(shí)最終是調(diào)用了Sync中重寫(xiě)的tryReleaseShared方法,該方法以 CAS 的操作來(lái)減少 state;若更新后state 歸零,則表示所有的計(jì)數(shù)任務(wù)線程都執(zhí)行完畢,那么在 CountDownLatch 上等待的線程就會(huì)被AQSdoReleaseShared方法喚醒并繼續(xù)向下執(zhí)行。

// CountDownLatch -> countDown()
public void countDown() {sync.releaseShared(1);
}
// CountDownLatch -> Sync -> AQS -> releaseShared
public final boolean releaseShared(int arg) {// 判斷遞減后計(jì)數(shù)器是否歸零if (tryReleaseShared(arg)) {// 喚醒所有等待線程(AQS實(shí)現(xiàn))doReleaseShared();return true;}return false;
}
// CountDownLatch -> Sync -> tryReleaseShared
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 獲取當(dāng)前stateint c = getState();// 若計(jì)數(shù)器歸零則返回false,其他什么也不做if (c == 0)return false;// CAS更新state遞減int nextc = c-1;if (compareAndSetState(c, nextc))// 若更新成功則判斷新計(jì)數(shù)值是否歸零return nextc == 0;}
}

3. CyclicBarrier

//

http://aloenet.com.cn/news/28808.html

相關(guān)文章:

  • 用bootstrap做的手機(jī)推銷(xiāo)網(wǎng)站模板如何在百度上發(fā)表文章
  • 做淘客網(wǎng)站要備案網(wǎng)絡(luò)廣告人社區(qū)
  • 佛山專業(yè)的網(wǎng)站制作怎么做公司網(wǎng)站推廣
  • 腐女做喜歡的網(wǎng)站做銷(xiāo)售記住這十句口訣
  • 哪些網(wǎng)站可以做簽約設(shè)計(jì)師在線crm網(wǎng)站建站
  • wix建設(shè)網(wǎng)站鏈接交換平臺(tái)
  • 淘寶客做網(wǎng)站教程網(wǎng)站注冊(cè)信息查詢
  • 建設(shè)網(wǎng)站的方案抖音競(jìng)價(jià)推廣怎么做
  • 兼職做ps網(wǎng)站百度網(wǎng)盤(pán)搜索引擎網(wǎng)站
  • wordpress 所有鉤子商丘seo公司
  • 大型的網(wǎng)站開(kāi)發(fā)crm系統(tǒng)網(wǎng)站
  • 免費(fèi)網(wǎng)站制作效果百度一下你就知道123
  • 邯鄲做移動(dòng)網(wǎng)站費(fèi)用seo專業(yè)論壇
  • 網(wǎng)站網(wǎng)速慢刷百度關(guān)鍵詞排名
  • 瑞安做網(wǎng)站建設(shè)湖南網(wǎng)站建設(shè)seo
  • 桐鄉(xiāng)做網(wǎng)站正規(guī)seo大概多少錢(qián)
  • 網(wǎng)站不兼容怎么辦鄭州seo博客
  • 網(wǎng)站內(nèi)容全屏截屏怎么做網(wǎng)站做seo教程
  • 自己站網(wǎng)站如何進(jìn)行搜索引擎優(yōu)化
  • 查法人信息的網(wǎng)站培訓(xùn)計(jì)劃和培訓(xùn)內(nèi)容
  • 建網(wǎng)站要花錢(qián)嗎seo如何建立優(yōu)化網(wǎng)站
  • 上海網(wǎng)站建設(shè)定制公司谷歌推廣代理公司
  • 用wordpress怎么做網(wǎng)站免費(fèi)s站推廣網(wǎng)站
  • 濰坊網(wǎng)站建設(shè)客服代寫(xiě)文章質(zhì)量高的平臺(tái)
  • 電子商務(wù)網(wǎng)站開(kāi)發(fā)是什么官方正版清理優(yōu)化工具
  • cnzz網(wǎng)站建設(shè)廣州seo好找工作嗎
  • 廣州家居網(wǎng)站設(shè)計(jì)nba最新交易匯總
  • 大學(xué)生想做網(wǎng)站西安seo優(yōu)化推廣
  • 青島網(wǎng)站建設(shè)小公司seo綜合查詢工具
  • 網(wǎng)站是否必須做可信網(wǎng)站認(rèn)證seo自動(dòng)優(yōu)化軟件下載