




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)
文檔簡介
1、 Hadoop之TaskTraker分析TaskTracker的工作職責(zé)之前已經(jīng)和大家提過,主要負(fù)責(zé)維護,申請和監(jiān)控Task,通過heartbeat和JobTracker進行通信。 TaskTracker的init過程: 1.讀取配置文件,解析參數(shù) 2.將TaskTraker上原有的用戶local files刪除并新建新的dir和file 3. Map<TaskAttemptI
2、D, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map 4. this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();記錄task的鏈表 this.runningJo
3、bs = new TreeMap<JobID, RunningJob>();記錄job的id信息 5.初始化JVMManager:java view plaincopyprint?1. mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 2. true, tracker);
4、0;3. reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(), 4. false, tracker); 6.初始化RPC,獲取JobTracker client用于heartbeat通信; 7.new一個 后臺線程用于監(jiān)聽
5、map完成的事件java view plaincopyprint?1. this.mapEventsFetcher = new MapEventsFetcherThread(); 2. mapEventsFetcher.setDaemon(true); 3. mapEventsFetcher.setName( 4.
6、0; "Map-events fetcher for all reduce tasks " + "on " + 5.
7、; taskTrackerName); 6. mapEventsFetcher.start(); 后臺線程的run方法如下:java view plaincopyprint?1. while (running) 2. try 3.
8、0; List <FetchStatus> fList = null; 4. synchronized (runningJobs) 5. while
9、 (fList = reducesInShuffle().size() = 0) 6. try 7. runningJobs.wait();
10、60; 8. catch (InterruptedException e) 9. LOG.info("Shutting down: " +
11、 this.getName(); 10. return; 11. 12.
12、160; 13. 14. / now fetch all the map task events for all the reduce tasks 15.
13、160; / possibly belonging to different jobs 16. boolean fetchAgain = false; /flag signifying whether we want to fetch
14、160;17. /immediately again. 18.
15、 for (FetchStatus f : fList) 19. long currentTime = System.currentTimeMillis(); 20.
16、0;try 21. /the method below will return true when we have not 22.
17、;/fetched all available events yet 23. if (f.fetchMapCompletionEvents(currentTime) 24.
18、0; fetchAgain = true; 25. 26. catch (Exception e) 27. &
19、#160; LOG.warn( 28. "Ignoring exception that fetch for map completion"
20、+ 29. " events threw for " + f.jobId + " threw: " + 30.
21、60; StringUtils.stringifyException(e); 31. 32.
22、60; if (!running) 33. break; 34. 35.
23、60; 36. synchronized (waitingOn) 37. try 38. if (!fet
24、chAgain) 39. waitingOn.wait(heartbeatInterval); 40. 41.
25、 catch (InterruptedException ie) 42. LOG.info("Shutting down: " + this.getName(); 43.
26、60; return; 44. 45. 46. catch (Exception e)
27、60; 47. LOG.info("Ignoring exception " + e.getMessage(); 48. 49. 50.
28、; 8.initializeMemoryManagement,初始化每個TrackTask的內(nèi)存設(shè)置9.new一個Map和Reducer的Launcher后臺線程java view plaincopyprint?1. mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots); 2. reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxRe
29、duceSlots); 3. mapLauncher.start(); 4. reduceLauncher.start(); 用于后面創(chuàng)建子JVM來執(zhí)行map、reduce task看一下java view plaincopyprint?1. TaskLauncher的run方法: 2. /before preparing the job localize 3. &
30、#160; /all the archives 4. TaskAttemptID taskid = t.getTaskID(); 5. final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.
31、local.dir"); 6. /simply get the location of the workDir and pass it to the child. The 7. /child will do the actual
32、160;dir creation 8. final File workDir = 9. new File(new Path(localdirsrand.nextInt(localdirs.length), 10.
33、0; TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), 11. taskid.toString(), 12. t.isTaskCleanupTask().toString();
34、60;13. 14. String user = tip.getUGI().getUserName(); 15. 16. / Set up the child t
35、ask's configuration. After this call, no localization 17. / of files should happen in the TaskTracker's process space. Any changes to 18.
36、 / the conf object after this will NOT be reflected to the child. 19. / setupChildTaskConfiguration(lDirAlloc); 20. 21.
37、0; if (!prepare() 22. return; 23. 24. 25. / Accumulates clas
38、s paths for child. 26. List<String> classPaths = getClassPaths(conf, workDir, 27.
39、; taskDistributedCacheManager); 28. 29. long logSize = TaskLog.getTaskL
40、ogLength(conf); 30. 31. / Build exec child JVM args. 32. Vector<String> vargs = getVMArgs(taskid, wor
41、kDir, classPaths, logSize); 33. 34. tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf); 35. 36. / set me
42、mory limit using ulimit if feasible and necessary . 37. String setup = getVMSetupCmd(); 38. / Set up the redirection of t
43、he task's stdout and stderr streams 39. File logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask(); 40. File stdout = logFiles0;
44、;41. File stderr = logFiles1; 42. tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, 43.
45、60; stderr); 44. 45. Map<String, String> env = new HashMap<String, String>(); 46. errorInfo
46、160;= getVMEnvironment(errorInfo, user, workDir, conf, env, taskid, 47. &
47、#160; logSize); 48. 49. / flatten the env as a set of export commands 50. List <String&g
48、t; setupCmds = new ArrayList<String>(); 51. for(Entry<String, String> entry : env.entrySet() 52. StringBuffer sb = new
49、60;StringBuffer(); 53. sb.append("export "); 54. sb.append(entry.getKey(); 55. sb.append("=""
50、); 56. sb.append(entry.getValue(); 57. sb.append("""); 58. setupCmds.add(sb.toString(); 59.
51、; 60. setupCmds.add(setup); 61. 62. launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir
52、); 63. tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID(); 64. if (exitCodeSet) 65. if (!killed &&
53、60;exitCode != 0) 66. if (exitCode = 65) 67. tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTask
54、ID(); 68. 69. throw new IOException("Task process exit with nonzero status of " + 70.
55、 exitCode + "."); 71. 72. 73. run方法為當(dāng)前task
56、 new一個child JVM,為其設(shè)置文件路徑,上下文環(huán)境,JVM啟動參數(shù)和啟動命令等信息,然后調(diào)用TaskControll方法啟動新的JVM執(zhí)行對應(yīng)的Task工作。各個類關(guān)系圖如下所示:最后以TaskController的launchTask截至10.然后開始 startHealthMonitor(this.fConf);再來看看TaskLauncher的run方法,就是不停的循環(huán)去獲取TaskTracker中新的task,然后調(diào)用startNewTask方法java view plaincopyprint?1. if (this.taskStatus.ge
57、tRunState() = TaskStatus.State.UNASSIGNED | 2. this.taskStatus.getRunState() = TaskStatus.State.FAILED_UNCLEAN | 3. this.taskStatus.getRunSt
58、ate() = TaskStatus.State.KILLED_UNCLEAN) 4. localizeTask(task); 5. if (this.taskStatus.getRunState() = TaskStatus.State.UNASSIGNED) 6.
59、; this.taskStatus.setRunState(TaskStatus.State.RUNNING); 7. 8. setTaskRunner(task.createRunner(TaskTracker.this, this, rjob); 9.
60、160; this.runner.start(); 10. long now = System.currentTimeMillis(); 11. this.taskStatus.setStartTime(now); 12.
61、60; this.lastProgressReport = now; TaskTracker的run方法:通過維護心跳和JobTracker通信,以獲取、殺掉新的Task,重點看一下heartBeat通信過程:java view plaincopyprint?1. synchronized (this) 2. askForNewTask = 3.
62、160; (status.countOccupiedMapSlots() < maxMapSlots | 4. status.countOccupiedReduceSlots() < maxReduceSlots) && 5. &
63、#160; acceptNewTasks); 6. localMinSpaceStart = minSpaceStart; 7. 8. if (askForNewTask) 9. askForNewTask =
64、;enoughFreeSpace(localMinSpaceStart); 10. long freeDiskSpace = getFreeSpace(); 11. long totVmem = getTotalVirtualMemoryOnTT(); 12. long totPme
65、m = getTotalPhysicalMemoryOnTT(); 13. long availableVmem = getAvailableVirtualMemoryOnTT(); 14. long availablePmem = getAvailablePhysicalMemoryOnTT(); 15.
66、 long cumuCpuTime = getCumulativeCpuTimeOnTT(); 16. long cpuFreq = getCpuFrequencyOnTT(); 17. int numCpu = getNumProcessorsOnTT(); 18.
67、0; float cpuUsage = getCpuUsageOnTT(); 19. 20. status.getResourceStatus().setAvailableSpace(freeDiskSpace); 21. status.getResourceStatus().setTotalVirtualMemory(totVmem);
68、0; 22. status.getResourceStatus().setTotalPhysicalMemory(totPmem); 23. status.getResourceStatus().setMapSlotMemorySizeOnTT( 24. mapSlotMemorySizeOnTT);
69、160;25. status.getResourceStatus().setReduceSlotMemorySizeOnTT( 26. reduceSlotSizeMemoryOnTT); 27. status.getResourceStatus().setAvailableVirtualMemory(availableVme
70、m); 28. status.getResourceStatus().setAvailablePhysicalMemory(availablePmem); 29. status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime); 30. status.getResourceStatu
71、s().setCpuFrequency(cpuFreq); 31. status.getResourceStatus().setNumProcessors(numCpu); 32. status.getResourceStatus().setCpuUsage(cpuUsage); 33. 34. /add
72、node health information 35. 36. TaskTrackerHealthStatus healthStatus = status.getHealthStatus(); 37. synchronized (this) 38. if (healthChecker != null) 39. heal
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 四川商務(wù)職業(yè)學(xué)院《環(huán)境學(xué)基礎(chǔ)》2023-2024學(xué)年第二學(xué)期期末試卷
- 阜陽職業(yè)技術(shù)學(xué)院《概率論與數(shù)理統(tǒng)計AW》2023-2024學(xué)年第一學(xué)期期末試卷
- 河南女子職業(yè)學(xué)院《舞蹈鑒賞與批評》2023-2024學(xué)年第二學(xué)期期末試卷
- 湖南冶金職業(yè)技術(shù)學(xué)院《土木水利專業(yè)導(dǎo)論》2023-2024學(xué)年第二學(xué)期期末試卷
- 浙江工業(yè)職業(yè)技術(shù)學(xué)院《建筑裝飾材料與施工工藝》2023-2024學(xué)年第一學(xué)期期末試卷
- 福建信息職業(yè)技術(shù)學(xué)院《模擬商務(wù)談判》2023-2024學(xué)年第一學(xué)期期末試卷
- 四川省眉山一中辦學(xué)共同體2024-2025學(xué)年高三下期末考試物理試題(B卷)含解析
- 廣西藍天航空職業(yè)學(xué)院《自動化系統(tǒng)概論》2023-2024學(xué)年第二學(xué)期期末試卷
- 吉林省吉化第一高級中學(xué)2025屆高三考前沖刺模擬語文試題試卷含解析
- 福建師范大學(xué)《汽車服務(wù)工程專業(yè)導(dǎo)論》2023-2024學(xué)年第二學(xué)期期末試卷
- 無人機操控技術(shù)(項目式 · 含工作頁) PPT 4-4 DJI地面站操控
- 市政工程計量計價 課件 項目4 管網(wǎng)工程計量與計價
- 基于深度學(xué)習(xí)的多模態(tài)數(shù)據(jù)融合方法研究
- 醫(yī)療器械倉庫防靜電措施規(guī)范
- GB/T 43493.2-2023半導(dǎo)體器件功率器件用碳化硅同質(zhì)外延片缺陷的無損檢測識別判據(jù)第2部分:缺陷的光學(xué)檢測方法
- 2024年DIP管理專項考核試題
- 無創(chuàng)神經(jīng)調(diào)控技術(shù)輔助阿爾茨海默病治療的中國專家共識(2023)要點
- 六宮數(shù)獨題目
- 韓愈簡介完整
- 《學(xué)前兒童科學(xué)教育》第二章 幼兒科學(xué)教育的目標(biāo)與內(nèi)容課件
- 馬克思主義與社會科學(xué)方法論習(xí)題與答案
評論
0/150
提交評論