MapReduce框架結(jié)構(gòu)說(shuō)課講解_第1頁(yè)
MapReduce框架結(jié)構(gòu)說(shuō)課講解_第2頁(yè)
MapReduce框架結(jié)構(gòu)說(shuō)課講解_第3頁(yè)
MapReduce框架結(jié)構(gòu)說(shuō)課講解_第4頁(yè)
MapReduce框架結(jié)構(gòu)說(shuō)課講解_第5頁(yè)
已閱讀5頁(yè),還剩14頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、Good is good, but better carries it.精益求精,善益求善。MapReduce框架結(jié)構(gòu)-2MapReduce框架結(jié)構(gòu)Map/Reduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型,它最初是由Google工程師設(shè)計(jì)并實(shí)現(xiàn)的,Google已經(jīng)將它完整的HYPERLINK/papers/mapreduce.htmlMapReduce論文公開(kāi)發(fā)布了。其中對(duì)它的定義是,Map/Reduce是一個(gè)編程模型(programmingmodel),是一個(gè)用于處理和生成大規(guī)模數(shù)據(jù)集(processingandgeneratinglargedatasets)的相關(guān)的實(shí)現(xiàn)。用戶定義一個(gè)m

2、ap函數(shù)來(lái)處理一個(gè)key/value對(duì)以生成一批中間的key/value對(duì),再定義一個(gè)reduce函數(shù)將所有這些中間的有著相同key的values合并起來(lái)。很多現(xiàn)實(shí)世界中的任務(wù)都可用這個(gè)模型來(lái)表達(dá)。Hadoop的Map/Reduce框架也是基于這個(gè)原理實(shí)現(xiàn)的,下面簡(jiǎn)要介紹一下Map/Reduce框架主要組成及相互的關(guān)系。2.1總體結(jié)構(gòu)2.1.1Mapper和Reducer運(yùn)行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個(gè)Mapper和一個(gè)Reducer類,以及一個(gè)創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個(gè)Combiner類,它實(shí)際也是Reducer的實(shí)現(xiàn)。2.

3、1.2JobTracker和TaskTracker它們都是由一個(gè)master服務(wù)JobTracker和多個(gè)運(yùn)行于多個(gè)節(jié)點(diǎn)的slaver服務(wù)TaskTracker兩個(gè)類提供的服務(wù)調(diào)度的。master負(fù)責(zé)調(diào)度job的每一個(gè)子任務(wù)task運(yùn)行于slave上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運(yùn)行它,slave則負(fù)責(zé)直接執(zhí)行每一個(gè)task。TaskTracker都需要運(yùn)行在HDFS的DataNode上,而JobTracker則不需要,一般情況應(yīng)該把JobTracker部署在單獨(dú)的機(jī)器上。2.1.3JobClient每一個(gè)job都會(huì)在用戶端通過(guò)JobClient類將應(yīng)用程序以及配置參數(shù)Confi

4、guration打包成jar文件存儲(chǔ)在HDFS,并把路徑提交到JobTracker的master服務(wù),然后由master創(chuàng)建每一個(gè)Task(即MapTask和ReduceTask)將它們分發(fā)到各個(gè)TaskTracker服務(wù)中去執(zhí)行。2.1.4JobInProgressJobClient提交job后,JobTracker會(huì)創(chuàng)建一個(gè)JobInProgress來(lái)跟蹤和調(diào)度這個(gè)job,并把它添加到j(luò)ob隊(duì)列里。JobInProgress會(huì)根據(jù)提交的jobjar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對(duì)應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時(shí)在創(chuàng)建指定數(shù)目的Ta

5、skInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個(gè)ReduceTask。2.1.5TaskInProgressJobTracker啟動(dòng)任務(wù)時(shí)通過(guò)每一個(gè)TaskInProgress來(lái)launchTask,這時(shí)會(huì)把Task對(duì)象(即MapTask和ReduceTask)序列化寫(xiě)入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會(huì)創(chuàng)建對(duì)應(yīng)的TaskInProgress(此TaskInProgress實(shí)現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動(dòng)具體的Task進(jìn)程是通過(guò)TaskInProgress管理的Task

6、Runner對(duì)象來(lái)運(yùn)行的。TaskRunner會(huì)自動(dòng)裝載jobjar,并設(shè)置好環(huán)境變量后啟動(dòng)一個(gè)獨(dú)立的javachild進(jìn)程來(lái)執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運(yùn)行在同一個(gè)TaskTracker中。2.1.6MapTask和ReduceTask一個(gè)完整的job會(huì)自動(dòng)依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時(shí)執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實(shí)際也是Reducer接口類的實(shí)現(xiàn)。Mapper會(huì)根據(jù)jobjar中定義的

7、輸入數(shù)據(jù)集按對(duì)讀入,處理完成生成臨時(shí)的對(duì),如果定義了Combiner,MapTask會(huì)在Mapper完成調(diào)用該Combiner將相同key的值做合并處理,以減少輸出結(jié)果集。MapTask的任務(wù)全完成即交給ReduceTask進(jìn)程調(diào)用Reducer處理,生成最終結(jié)果對(duì)。這個(gè)過(guò)程在下一部分再詳細(xì)介紹。下圖描述了Map/Reduce框架中主要組成和它們之間的關(guān)系:2.2Job創(chuàng)建過(guò)程2.2.1JobClient.runJob()開(kāi)始運(yùn)行job并分解輸入數(shù)據(jù)集一個(gè)MapReduce的Job會(huì)通過(guò)JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實(shí)現(xiàn)類來(lái)將輸入的數(shù)據(jù)集分解成一批

8、小的數(shù)據(jù)集,每一個(gè)小數(shù)據(jù)集會(huì)對(duì)應(yīng)創(chuàng)建一個(gè)MapTask來(lái)處理。JobClient會(huì)使用缺省的FileInputFormat類調(diào)用FileInputFormat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會(huì)將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會(huì)統(tǒng)一打包到j(luò)obFile的jar中并存儲(chǔ)在HDFS中,再將jobFile路徑提交給JobTracker去調(diào)度和執(zhí)行。2.2.2JobClient.submitJob()提交job到JobTrackerjobFile的提交過(guò)程是通過(guò)RPC模

9、塊(有單獨(dú)一章來(lái)詳細(xì)介紹)來(lái)實(shí)現(xiàn)的。大致過(guò)程是,JobClient類中通過(guò)RPC實(shí)現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實(shí)現(xiàn)JobSubmissionProtocol接口。JobTracker則根據(jù)獲得的jobFile路徑創(chuàng)建與job有關(guān)的一系列對(duì)象(即JobInProgress和TaskInProgress等)來(lái)調(diào)度并執(zhí)行job。JobTracker創(chuàng)建job成功后會(huì)給JobClient傳回一個(gè)JobStatus對(duì)象用于記錄job的狀態(tài)信息,如執(zhí)行時(shí)間、Map和Reduce任務(wù)完成的比例等。JobClient會(huì)根據(jù)這個(gè)JobStat

10、us對(duì)象創(chuàng)建一個(gè)NetworkedJob的RunningJob對(duì)象,用于定時(shí)從JobTracker獲得執(zhí)行過(guò)程的統(tǒng)計(jì)數(shù)據(jù)來(lái)監(jiān)控并打印到用戶的控制臺(tái)。與創(chuàng)建Job過(guò)程相關(guān)的類和方法如下圖所示2.3Job執(zhí)行過(guò)程上面已經(jīng)提到,job是統(tǒng)一由JobTracker來(lái)調(diào)度的,具體的Task分發(fā)給各個(gè)TaskTracker節(jié)點(diǎn)來(lái)執(zhí)行。下面通過(guò)源碼來(lái)詳細(xì)解析執(zhí)行過(guò)程,首先先從JobTracker收到JobClient的提交請(qǐng)求開(kāi)始。2.3.1JobTracker初始化Job和Task隊(duì)列過(guò)程JobTracker.submitJob()收到請(qǐng)求當(dāng)JobTracker接收到新的job請(qǐng)求(即submitJob(

11、)函數(shù)被調(diào)用)后,會(huì)創(chuàng)建一個(gè)JobInProgress對(duì)象并通過(guò)它來(lái)管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時(shí)候會(huì)初始化一系列與任務(wù)有關(guān)的參數(shù),如jobjar的位置(會(huì)把它從HDFS復(fù)制本地的文件系統(tǒng)中的臨時(shí)目錄里),Map和Reduce的數(shù)據(jù),job的優(yōu)先級(jí)別,以及記錄統(tǒng)計(jì)報(bào)告的對(duì)象等。JobTracker.resortPriority()加入隊(duì)列并按優(yōu)先級(jí)排序JobInProgress創(chuàng)建后,首先將它加入到j(luò)obs隊(duì)列里,分別用一個(gè)map成員變量jobs用來(lái)管理所有jobs對(duì)象,一個(gè)list成員變量jobsByPriority用來(lái)維護(hù)jobs的執(zhí)行優(yōu)先級(jí)別。之后JobTracke

12、r會(huì)調(diào)用resortPriority()函數(shù),將jobs先按優(yōu)先級(jí)別排序,再按提交時(shí)間排序,這樣保證最高優(yōu)先并且先提交的job會(huì)先執(zhí)行。JobTracker.JobInitThread通知初始化線程然后JobTracker會(huì)把此job加入到一個(gè)管理需要初始化的隊(duì)列里,即一個(gè)list成員變量jobInitQueue里。通過(guò)此成員變量調(diào)用notifyAll()函數(shù),會(huì)喚起一個(gè)用于初始化job的線程JobInitThread來(lái)處理(JobTracker會(huì)有幾個(gè)內(nèi)部的線程來(lái)維護(hù)jobs隊(duì)列,它們的實(shí)現(xiàn)都在JobTracker代碼里,稍候再詳細(xì)介紹)。JobInitThread收到信號(hào)后即取出最靠前的j

13、ob,即優(yōu)先級(jí)別最高的job,調(diào)用JobInProgress的initTasks()函數(shù)執(zhí)行真正的初始化工作。JobInProgress.initTasks()初始化TaskInProgressTask的初始化過(guò)程稍復(fù)雜些,首先步驟JobInProgress會(huì)創(chuàng)建Map的監(jiān)控對(duì)象。在initTasks()函數(shù)里通過(guò)調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個(gè)列表創(chuàng)建對(duì)應(yīng)數(shù)目的Map執(zhí)行管理對(duì)象TaskInProgress。在這個(gè)過(guò)程中,還會(huì)記錄該RawSplit塊對(duì)應(yīng)的所有在HDFS里的blocks所在的DataNode節(jié)點(diǎn)的h

14、ost,這個(gè)會(huì)在RawSplit創(chuàng)建時(shí)通過(guò)FileSplit的getLocations()函數(shù)獲取,該函數(shù)會(huì)調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個(gè)細(xì)節(jié)會(huì)在HDFS模塊中講解)。當(dāng)然如果是存儲(chǔ)在本地文件系統(tǒng)中,即使用LocalFileSystem時(shí)當(dāng)然只有一個(gè)location即“l(fā)ocalhost”了。其次JobInProgress會(huì)創(chuàng)建Reduce的監(jiān)控對(duì)象,這個(gè)比較簡(jiǎn)單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個(gè)Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的也是TaskInProgress類,不過(guò)構(gòu)造方法有所不同,

15、TaskInProgress會(huì)根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。JobInProgress創(chuàng)建完TaskInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過(guò)程全部結(jié)束,執(zhí)行則是通過(guò)另一異步的方式處理的,下面接著介紹它。與初始化Job過(guò)程相關(guān)的類和方法如下圖所示2.3.2TaskTracker執(zhí)行Task的過(guò)程Task的執(zhí)行實(shí)際是由TaskTracker發(fā)起的,TaskTracker會(huì)定期(缺省為10秒鐘,

16、參見(jiàn)MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進(jìn)行一次通信,報(bào)告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會(huì)在這時(shí)啟動(dòng),即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時(shí)進(jìn)行,此調(diào)用底層是通過(guò)IPC層調(diào)用Proxy接口(在IPC章節(jié)詳細(xì)介紹)實(shí)現(xiàn)。這個(gè)過(guò)程實(shí)際比較復(fù)雜,下面一一簡(jiǎn)單介紹下每個(gè)步驟。TaskTracker.run()連接JobTrackerTaskTracker的啟動(dòng)過(guò)程會(huì)初始化一系列參數(shù)和服務(wù)(另有單獨(dú)的一節(jié)介紹),然后嘗試連接JobTracke

17、r服務(wù)(即必須實(shí)現(xiàn)InterTrackerProtocol接口),如果連接斷開(kāi),則會(huì)循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù),此過(guò)程參見(jiàn)run()方法。TaskTracker.offerService()主循環(huán)如果連接JobTracker服務(wù)成功,TaskTracker就會(huì)調(diào)用offerService()函數(shù)進(jìn)入主執(zhí)行循環(huán)中。這個(gè)循環(huán)會(huì)每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat()獲得HeartbeatResponse信息。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過(guò)來(lái)的所有指令即

18、一個(gè)TaskTrackerAction數(shù)組。再遍歷這個(gè)數(shù)組,如果是一個(gè)新任務(wù)指令即LaunchTaskAction則調(diào)用startNewTask()函數(shù)執(zhí)行新任務(wù),否則加入到tasksToCleanup隊(duì)列,交給一個(gè)taskCleanupThread線程來(lái)處理,如執(zhí)行KillJobAction或者KillTaskAction等。TaskTracker.transmitHeartBeat()獲取JobTracker指令在transmitHeartBeat()函數(shù)處理中,TaskTracker會(huì)創(chuàng)建一個(gè)新的TaskTrackerStatus對(duì)象記錄目前任務(wù)的執(zhí)行狀況,然后通過(guò)IPC接口調(diào)用JobT

19、racker的heartbeat()方法發(fā)送過(guò)去,并接受新的指令,即返回值TaskTrackerAction數(shù)組。在這個(gè)調(diào)用之前,TaskTracker會(huì)先檢查目前執(zhí)行的Task數(shù)目以及本地磁盤(pán)的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。操作成功后再更新相關(guān)的統(tǒng)計(jì)信息等。TaskTracker.startNewTask()啟動(dòng)新任務(wù)此函數(shù)的主要任務(wù)就是創(chuàng)建TaskTracker$TaskInProgress對(duì)象來(lái)調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊(duì)列中。完成后則調(diào)用localizeJob()真正初始化Tas

20、k并開(kāi)始執(zhí)行。TaskTracker.localizeJob()初始化job目錄等此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將jobjar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個(gè)RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊(duì)列中。完成后即調(diào)用launchTaskForJob()開(kāi)始執(zhí)行Task。TaskTracker.launchTaskForJob()執(zhí)行任務(wù)啟動(dòng)Task的工作實(shí)際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來(lái)執(zhí)行的。T

21、askTracker$TaskInProgress.launchTask()執(zhí)行任務(wù)執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫(xiě)入到本地目錄中。然后通過(guò)調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對(duì)象并調(diào)用其start()方法最后啟動(dòng)Task獨(dú)立的java執(zhí)行子進(jìn)程。Task.createRunner()創(chuàng)建啟動(dòng)Runner對(duì)象Task有兩個(gè)實(shí)現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會(huì)創(chuàng)建MapTaskRunner來(lái)啟動(dòng)Task子進(jìn)程,而ReduceTask則創(chuàng)建Reduce

22、TaskRunner來(lái)啟動(dòng)。TaskRunner.start()啟動(dòng)子進(jìn)程真正執(zhí)行Task這里是真正啟動(dòng)子進(jìn)程并執(zhí)行Task的地方。它會(huì)調(diào)用run()函數(shù)來(lái)處理。執(zhí)行的過(guò)程比較復(fù)雜,主要的工作就是初始化啟動(dòng)java子進(jìn)程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等(需要將TaskTracker的環(huán)境變量以及jobjar的路徑合并起來(lái))。然后裝載jobjar包,調(diào)用runChild()方法啟動(dòng)子進(jìn)程,即通過(guò)ProcessBuilder來(lái)創(chuàng)建,同時(shí)子進(jìn)程的stdout/stdin/syslog的輸出定向到該Task指定的輸出日志目錄中,具體的輸出通過(guò)TaskL

23、og類來(lái)實(shí)現(xiàn)。這里有個(gè)小問(wèn)題,Task子進(jìn)程只能輸出INFO級(jí)別日志,而且該級(jí)別是在run()函數(shù)中直接指定,不過(guò)改進(jìn)也不復(fù)雜。與Job執(zhí)行過(guò)程相關(guān)的類和方法如下圖所示2.4JobTracker和TaskTracker如上面所述,JobTracker和TaskTracker是MapReduce框架最基本的兩個(gè)服務(wù),其他所有處理均由它們調(diào)度執(zhí)行,下面簡(jiǎn)單介紹它們內(nèi)部提供的服務(wù)及創(chuàng)建的線程,詳細(xì)過(guò)程下回分解J2.4.1JobTracker的服務(wù)和線程JobTracker是MapReduce框架中最主要的類之一,所有job的執(zhí)行都由它來(lái)調(diào)度,而且Hadoop系統(tǒng)中只配置一個(gè)JobTracker應(yīng)用。

24、啟動(dòng)JobTracker后它會(huì)初始化若干個(gè)服務(wù)以及若干個(gè)內(nèi)部線程用來(lái)維護(hù)job的執(zhí)行過(guò)程和結(jié)果。下面簡(jiǎn)單介紹一下它們。首先,JobTracker會(huì)啟動(dòng)一個(gè)interTrackerServer,端口配置在Configuration中的mapred.job.tracker參數(shù),缺省是綁定8012端口。它有兩個(gè)用途,一是用于接收和處理TaskTracker的heartbeat等請(qǐng)求,即必須實(shí)現(xiàn)InterTrackerProtocol接口及協(xié)議。二是用于接收和處理JobClient的請(qǐng)求,如submitJob,killJob等,即必須實(shí)現(xiàn)JobSubmissionProtocol接口及協(xié)議。其次,它會(huì)啟動(dòng)一個(gè)infoSe

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論