第二部分并發(fā)工具類14講26丨fork join單機(jī)版的mapreducel_第1頁
第二部分并發(fā)工具類14講26丨fork join單機(jī)版的mapreducel_第2頁
第二部分并發(fā)工具類14講26丨fork join單機(jī)版的mapreducel_第3頁
第二部分并發(fā)工具類14講26丨fork join單機(jī)版的mapreducel_第4頁
第二部分并發(fā)工具類14講26丨fork join單機(jī)版的mapreducel_第5頁
已閱讀5頁,還剩16頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

上面提到的簡單并行、聚合、批量并行這三種任務(wù)模型,基本上能夠覆蓋日常工作中的并發(fā)場景了,但還是不夠全面,因?yàn)檫€有一種“分治”的任務(wù)模型沒有覆蓋到。分治,顧名思義,即分而治之,是一種解決復(fù)雜問題的思維方法和模式;具體來講,指的是把一個(gè)復(fù)雜的問題分解成多個(gè)相似的子問題,然后再把子問題分解成更小的子問題,直到子問題簡單到可以直接求解。理論上來講,解決每一個(gè)問題都對應(yīng)著一個(gè)任務(wù),所以對于問題的分治,實(shí)際上就是對務(wù)的分治。分治思想在很多領(lǐng)域都有廣泛的應(yīng)用,例如算法領(lǐng)域有分治算法(歸并排序、快速排序都屬于分治算法,二分法查找也是一種分治算法);大數(shù)據(jù)領(lǐng)域知名的計(jì)算框架MaRedu背后的思想也是分治。既然分治這種任務(wù)模型如此普遍,那Java顯然也需要支持,Java并發(fā)包里提供了一種叫做ForkJoin的并行計(jì)算框架,就是用來支持分治這種任務(wù)模型的。這里你需要先深入了解一下分治任務(wù)模型,分治任務(wù)模型可分為兩個(gè)階段:一個(gè)階段是分解,也就是將任務(wù)迭代地分解為子任務(wù),直至子任務(wù)可以直接計(jì)算出結(jié)果;另一個(gè)階段是結(jié)果合并,即逐層合并子任務(wù)的執(zhí)行結(jié)果,直至獲得最終結(jié)果。下圖是一個(gè)簡化的分治任務(wù)模型圖,你可以對照著理解。在這個(gè)分治任務(wù)模型里,任務(wù)和分解后的子任務(wù)具有相似性,這種相似性往往體現(xiàn)在任務(wù)和子任務(wù)的算法是相同的,但是計(jì)算的數(shù)據(jù)規(guī)模是不同的。具備這種相似性的問題,我們往往都采用遞歸算法。Fork/Join是一個(gè)并行計(jì)算的框架,主要就是用來支持分治任務(wù)模型的,這個(gè)計(jì)算框架里的Fork對應(yīng)的是分治任務(wù)模型里的任務(wù)分解,Join對應(yīng)的是結(jié)果合并。Fork/Join計(jì)算框架主要包含兩部分,一部分是分治任務(wù)的線程池ForkJoinPool,另一部分是分治任務(wù)ForkJoinTaskThreadPoolExecutorRunnable以理解為提交任務(wù)到線程池,只不過分治任務(wù)有自己獨(dú)特類型ForkJoinTask。ForkJoinTask是一個(gè)抽象類,它的方法有很多,最的是fork()方法和join()方法,其中fork()方異步地執(zhí)行一個(gè)子任務(wù),而join()方法則會(huì)阻塞當(dāng)前線程來等待子任務(wù)的執(zhí)行結(jié)果。ForkJoinTask有兩個(gè)子類——RecursiveAction和RecursiveTask,通過名字你compute(),不過區(qū)別是RecursiveAction定義的compute()沒有返回值,而RecursiveTask定義的compute接下來我們就來實(shí)現(xiàn)一下,看看如何用Fork/Join(下面的代碼源自Java示例)。首先我們需要?jiǎng)?chuàng)建一個(gè)分治任務(wù)線程池以及計(jì)算斐波那契數(shù)列的分治任務(wù),之后通過調(diào)用分治任務(wù)線程池的invoke()方法來啟動(dòng)分治任務(wù)。由于計(jì)算斐波那契數(shù)列需要有返回值,所以Fibonacci繼承自RecursiveTask。分治任務(wù)Fibonacci需要實(shí)現(xiàn)compute似,區(qū)別之處在于計(jì)算Fibonacci(n-1使用了異步子任務(wù),這是通過f1.fork()這staticvoidmain(String[]//ForkJoinPoolfjpnew//Fibonaccifibnew//Integerresult//13//staticclassFibonaccifinalintFibonacci(intn){this.n=protectedInteger if(n<=returnFibonaccif1newFibonacci(n-//Fibonaccif2newFibonacci(n- 31

// pute()+Fork/Join并行計(jì)算的組件是ForkJoinPool,所以下面我們就來簡單介紹一下ForkJoinPool的工作原理。通過專欄前面文章的學(xué)習(xí),你應(yīng)該已經(jīng)知道ThreadPoolExecutor本質(zhì)上是一個(gè)生產(chǎn)者-ThreadPoolExecutor可以有多個(gè)工作線程,但是這些工作線程都共個(gè)任務(wù)隊(duì)列。ForkJoinPool本質(zhì)上也是一個(gè)生產(chǎn)者-消費(fèi)者的實(shí)現(xiàn),但是更加智能,你可以參考下面的ForkJoinPool工作原理圖來理解其原理。ThreadPoolExecutor內(nèi)部只有一個(gè)任務(wù)隊(duì)列,而ForkJoinPool內(nèi)部有多個(gè)任務(wù)隊(duì)列,當(dāng)我們通過ForkJoinPool的invoke()或者submit()方法提交任務(wù)時(shí),F(xiàn)orkJoinPool根據(jù)一定的路由規(guī)則把任務(wù)提交到一個(gè)任務(wù)隊(duì)列如果工作線程對應(yīng)的任務(wù)隊(duì)列空了,是不是就沒活兒干了呢?不是的,F(xiàn)orkJoinPool支持一種叫做“任務(wù)竊取”的機(jī)制,如果工作線程空閑了,那它可以“竊取”其他工作任務(wù)隊(duì)列里的任務(wù),例如下圖中,線程T2對應(yīng)的任務(wù)隊(duì)列已經(jīng)空了,它可以“竊取”線程T1對應(yīng)ForkJoinPool中的任務(wù)隊(duì)列采用的是雙端隊(duì)列,工作線程正常獲取任務(wù)和“竊取任務(wù)”分簡化后的原理,F(xiàn)orkJoinPool的實(shí)現(xiàn)遠(yuǎn)比我們這里介紹的復(fù)雜,如果你感,建議去看ForkJoinPoolMapReduce學(xué)習(xí)MapReduce有一個(gè)程序,統(tǒng)計(jì)一個(gè)文件里面每個(gè)單詞的數(shù)量,下面我們來看看如何用Fork/Join并行計(jì)算框架來實(shí)現(xiàn)。我們可以先用二分法遞歸地將一個(gè)文件拆分成更小的文件,直到文件里只有一行數(shù)據(jù),然后統(tǒng)計(jì)這一行數(shù)據(jù)里單詞的數(shù)量,最后再逐級匯總結(jié)果,你可以對照前面的簡版分治任務(wù)模型圖來理解這個(gè)過程。思路有了,我們馬上來實(shí)現(xiàn)。下面的示例程序用一個(gè)字符串?dāng)?shù)組String[]fc來模擬文件內(nèi)容,fc里面的元素與文件里面的行數(shù)據(jù)一一對應(yīng)。關(guān)鍵的代碼在compute()這個(gè)方法里面,這是一個(gè)遞歸方法,前半部分?jǐn)?shù)據(jù)fork一個(gè)遞歸任務(wù)去處理(關(guān)鍵代碼 staticvoidmain(String[]String[]fc={"o"o"o"o678919

"forkjoinin//創(chuàng)建ForkJoinForkJoinPoolfjpnew//MRmr=newfc,0,//Map<String,Long>result=//result.forEach((k,v)->//MRstaticclassMRRecursiveTask<Map<String,Long>>privateString[]privateintstart,//MR(String[]fc,intfr,intthis.fc=this.start=this.end= @OverrideMap<String,Long>if(end-start==1)return}elseintmid=MRmr1=newfc,start,MRmr2=newfc,mid,//return //privateMap<String,Long>Map<String,Long>Map<String,Long>r2)Map<String,Long>resultnew//r2.forEach((k,v)->Longc=if(c!=result.put(k,result.put(k, return //privateMap<String,calc(Stringline)Map<String,Long>resultnew//String[]words//for(Stringw:words)Longv=if(v!=result.put(w,result.put(w, return 82Fork/Join并行計(jì)算框架主要解決的是分治任務(wù)。分治的思想是“分而治之”:將一個(gè)個(gè)過程非常類似于大數(shù)據(jù)處理中的MapReduce,所以你可以把Fork/Join看作單機(jī)版的Fork/Join并行計(jì)算框架的組件是ForkJoinPool。ForkJoinPool支持任務(wù)竊取機(jī)制,性能很好。Java1.8提供的StreamAPI里面并行流也是以ForkJoinPool為基礎(chǔ)的。不過需要你注意的是,默認(rèn)情況下所有的并行流計(jì)算都共個(gè)ForkJoinPool,這個(gè)共享的ForkJoinPool默認(rèn)的線程數(shù)是CPU的核數(shù);如果所有的并行流計(jì)算都是CPU密集型計(jì)算的話,完全沒有問題,但是如果存在I/O密集型的并行流計(jì)算,那么很可能會(huì)因?yàn)橐粋€(gè)很慢的I/O計(jì)算而拖慢整個(gè)系統(tǒng)的性能。所以建議用不同的ForkJoinPool執(zhí)行不同類型的如果你對ForkJoinPool詳細(xì)的實(shí)現(xiàn)細(xì)節(jié)感,也可以參考DougLea CPUCPUFork/Join覺得這篇文章對你有幫助的話,也歡迎把它給的朋友。 不得售賣。頁面已增加防盜追蹤,將依法其上一 25|CompletionService:如何批量執(zhí)行異步任務(wù)下一 27|并發(fā)工具類模塊熱點(diǎn)問題答寫言寫言 11 fork()在join的時(shí)候,需要用這樣的順序:a.forkb.forkb.joina.join();這 P同一時(shí)間只能處理一個(gè)線程,所以理論上,純u程的話,線程上下文切換帶來的線程現(xiàn)場保存和恢復(fù)也會(huì)帶來額外開銷。但實(shí)際上可能要經(jīng)過測試才知道。55 *Ryzen17008核163.0??????舉一反三了 2 性加載內(nèi)存溢出,也為了防止文件內(nèi)容過小,所以每次都確保讀出的行數(shù)10萬行左右),課后習(xí)題:單核的話,使用單線程會(huì)比多線程快,線程的切換,恢復(fù)等都會(huì)耗時(shí),并且 ??說明

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論