AbstractQueuedSynchronizer詳解(二)——CountDownLatch源碼分析_第1頁
AbstractQueuedSynchronizer詳解(二)——CountDownLatch源碼分析_第2頁
AbstractQueuedSynchronizer詳解(二)——CountDownLatch源碼分析_第3頁
AbstractQueuedSynchronizer詳解(二)——CountDownLatch源碼分析_第4頁
AbstractQueuedSynchronizer詳解(二)——CountDownLatch源碼分析_第5頁
已閱讀5頁,還剩4頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1、AbstractQueuedSynchronizer詳解(二)CountDownLatch源碼分析CountDownLatch的使用CountDownLatch是一個工具類,用于使一個或多個線程等待另一系列線程完成操作,也就是說一些線程在另外一些完成之后才能繼續(xù)執(zhí)行,類似線性。 初始CountDownLatch時需要提供一個count參數(shù),await方法將會使線程阻塞,直到這個值變?yōu)?,countDown()方法可以使count值減1,一旦到達0后,調(diào)用await阻塞的線程將會得到釋放。 CountDown一般有兩個用法。一個線程等待多個線程一個線程等待多個線程完成后再執(zhí)行,比如一個不限時的考

2、試,10個學生考試,交卷時間不同,當10個學生都交卷后,老師才認為考試結(jié)束。下面是該例子:public class ExamDemo public static class Student implements Runnable private int num;/學號 private CountDownLatch countDownLatch; public Student(int num, CountDownLatch countDownLatch) this.num = um; this.countDownLatch = countDownLatch; Override public vo

3、id run() Random random = new Random(); try TimeUnit.SECONDS.sleep(random.nextInt(10) + 1); catch (InterruptedException e) e.printStackTrace(); System.out.println("學生" + num + "交卷了"); /完成工作,將count-1 countDownLatch.countDown(); public static class Teacher implements Runnable privat

4、e CountDownLatch countDownLatch; public Teacher(CountDownLatch countDownLatch) this.countDownLatch = countDownLatch; Override public void run() System.out.println("考試開始,不限時間!"); try /等待count變?yōu)? countDownLatch.await(); catch (InterruptedException e) e.printStackTrace(); System.out.println(&

5、quot;考試結(jié)束"); public static void main(String args) Executor executor = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(10); executor.execute(new Teacher(countDownLatch); for (int i = 1; i <= 10; i+) executor.execute(new Student(i, countDownLatch); 多個線程等待一個線

6、程再舉個例子,比如幾個司機在等紅燈,而一旦變成了綠燈,幾個司機就可以都通過了。public class TrafficDemo public static class Driver implements Runnable private int num; private CountDownLatch countDownLatch; public Driver(int num, CountDownLatch countDownLatch) this.num = num; this.countDownLatch = countDownLatch; Override public void run(

7、) System.out.println("司機" + num + "于紅燈前等待"); try countDownLatch.await(); catch (InterruptedException e) e.printStackTrace(); System.out.println("司機" + num + "于綠燈通過"); public static class TrafficLight implements Runnable private CountDownLatch countDownLatch; p

8、ublic TrafficLight(CountDownLatch countDownLatch) this.countDownLatch = countDownLatch; Override public void run() try TimeUnit.SECONDS.sleep(5); catch (InterruptedException e) e.printStackTrace(); System.out.println("綠燈行"); countDownLatch.countDown(); public static void main(String args)

9、Executor executor = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(1); for (nt i = 1; i < 10; i+) executor.execute(new Driver(i, countDownLatch); executor.execute(new TrafficLight(countDownLatch); CountDownLatch源碼解析CountDownLatch初始化CountDownLatch只有一個構(gòu)造方法,如下: p

10、ublic CountDownLatch(int count) if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); 我們很容易知道Sync是一個繼承自AQS的內(nèi)部類,它負責CountDownLatch的同步事件。下面是Sync的定義:/* * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private sta

11、tic final class Sync extends AbstractQueuedSynchronizer private static final long serialVersionUID = 4982264981922014374L; /AQS的資源即設(shè)定的count Sync(int count) setState(count); int getCount() return getState(); protected int tryAcquireShared(int acquires) return (getState() = 0) ? 1 : -1; protected bool

12、ean tryReleaseShared(int releases) / Decrement count; signal when transition to zero for (;) int c = getState(); if (c = 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc) return nextc = 0; await方法await方法用于將線程阻塞,實現(xiàn)如下:public void await() throws InterruptedException sync.acquireSharedI

13、nterruptibly(1); 可以看到調(diào)用了acquireSharedInterruptibly方法,該方法在AQS中,實現(xiàn)如下:public final void acquireSharedInterruptibly(int arg) throws InterruptedException /線程被interrupt了,拋出異常 if (Terrupted() throw new InterruptedException(); /嘗試獲取arg個資源,如果小于0表示獲取失敗 if (tryAcquireShared(arg) < 0) doAcquireShare

14、dInterruptibly(arg); 從上面的方法可以看到,首先嘗試獲取資源,如果有資源,那么調(diào)用doAcquireSharedInterruptibly方法,tryAcquireShared方法是在Sync中的。從上面可以看到,如果getState為0則該線程不需要加入到等待隊列中,而如果不等于0,則需要將該線程加入到等待隊里中。 如果該線程需要加入到等待隊列中,那么就調(diào)用doAcquireSharedInterruptibly方法,如下:private void doAcquireSharedInterruptibly(int arg) throws IuptedException /

15、將該線程加入到等待隊列中 final Node node = addWaiter(Node.SHARED); boolean failed = true; try for (;) /得到前驅(qū)節(jié)點 final Node p = node.predecessor(); /如果前驅(qū)節(jié)點是頭節(jié)點 if (p = head) /嘗試獲取資源 int r = tryAcquireShared(arg); /getState為0,說明count為0了,等待的線程可以執(zhí)行了 if (r >= 0) setHeadAndPropagate(node, r); p.next = null; / help G

16、C failed = false; return; /檢查是否需要將當前線程掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() throw new InterruptedException(); finally if (failed) cancelAcquire(node); 隊列的結(jié)構(gòu)如下:其中Head是個空節(jié)點,所以一旦前驅(qū)節(jié)點是頭節(jié)點,并且要是count變?yōu)榱?,那么就調(diào)用setHeadAndPropagate方法,然后釋放頭節(jié)點。setHeadAndPropagate方法主要完成

17、兩件事,第一更改頭節(jié)點,第二由于此時等待線程可以執(zhí)行了,將該事件傳播給后面的節(jié)點,實現(xiàn)如下:private void setHeadAndPropagate(Node node, int propagate) Node h = head; / Record old head for check below /更改頭節(jié)點 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either bef

18、ore * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause *

19、unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ /如果需要傳播 if (propagate > 0 | h = null | h.waitStatus < 0 | (h = head) = null | h.waitStatus < 0) Node s = de.next; if (s = null | s.isShared() doReleaseShared(); 可

20、以看到如果需要傳播的話,將會調(diào)用doReleasShared方法,方法如下:private void doReleaseShared() for (;) Node h = head; if (h != null && h != tail) int ws = h.waitStatus; if (ws = Node.SIGNAL) if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0) /重置狀態(tài),成功后,將后繼節(jié)點喚醒 continue; / loop to recheck cases unparkSuccessor(h); else if (ws = 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE) continue; / loop on failed CAS if (h = head) / loop if head changed break; 至此,await方法就使線程掛起了,下面再分析countDown方法。countDown方法countDown方法用于將count值減1,如果count變?yōu)?則釋放所有等待線程,方法的實現(xiàn)如下:public void countDown() sync.relea

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論