Hadoop之TaskTraker分析_第1頁
Hadoop之TaskTraker分析_第2頁
Hadoop之TaskTraker分析_第3頁
Hadoop之TaskTraker分析_第4頁
Hadoop之TaskTraker分析_第5頁
已閱讀5頁,還剩3頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)

文檔簡介

1、  Hadoop之TaskTraker分析TaskTracker的工作職責(zé)之前已經(jīng)和大家提過,主要負(fù)責(zé)維護,申請和監(jiān)控Task,通過heartbeat和JobTracker進行通信。     TaskTracker的init過程:     1.讀取配置文件,解析參數(shù)     2.將TaskTraker上原有的用戶local files刪除并新建新的dir和file     3. Map<TaskAttemptI

2、D, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map     4.    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();記錄task的鏈表            this.runningJo

3、bs = new TreeMap<JobID, RunningJob>();記錄job的id信息     5.初始化JVMManager:java view plaincopyprint?1. mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),   2.       true, tracker); 

4、0;3.   reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),  4.       false, tracker);       6.初始化RPC,獲取JobTracker client用于heartbeat通信;     7.new一個 后臺線程用于監(jiān)聽

5、map完成的事件java view plaincopyprint?1. this.mapEventsFetcher = new MapEventsFetcherThread();  2. mapEventsFetcher.setDaemon(true);  3. mapEventsFetcher.setName(  4.               

6、0;          "Map-events fetcher for all reduce tasks " + "on " +   5.                 

7、;         taskTrackerName);  6. mapEventsFetcher.start();      后臺線程的run方法如下:java view plaincopyprint?1. while (running)   2.        try   3. 

8、0;        List <FetchStatus> fList = null;  4.          synchronized (runningJobs)   5.            while

9、 (fList = reducesInShuffle().size() = 0)   6.              try   7.                runningJobs.wait();

10、60; 8.               catch (InterruptedException e)   9.                LOG.info("Shutting down: " +

11、 this.getName();  10.                return;  11.                12.          &#

12、160;   13.            14.          / now fetch all the map task events for all the reduce tasks  15.   &#

13、160;      / possibly belonging to different jobs  16.          boolean fetchAgain = false; /flag signifying whether we want to fetch &#

14、160;17.                                      /immediately again.  18.      

15、    for (FetchStatus f : fList)   19.            long currentTime = System.currentTimeMillis();  20.           

16、0;try   21.              /the method below will return true when we have not   22.              

17、;/fetched all available events yet  23.              if (f.fetchMapCompletionEvents(currentTime)   24.              

18、0; fetchAgain = true;  25.                26.             catch (Exception e)   27.      &

19、#160;       LOG.warn(  28.                       "Ignoring exception that fetch for map completion" 

20、+  29.                       " events threw for " + f.jobId + " threw: " +  30.   

21、60;                   StringUtils.stringifyException(e);   31.              32.        

22、60;   if (!running)   33.              break;  34.              35.          

23、60; 36.          synchronized (waitingOn)   37.            try   38.              if (!fet

24、chAgain)   39.                waitingOn.wait(heartbeatInterval);  40.                41.       

25、      catch (InterruptedException ie)   42.              LOG.info("Shutting down: " + this.getName();  43.      

26、60;       return;  44.              45.            46.         catch (Exception e)

27、60;  47.          LOG.info("Ignoring exception "  + e.getMessage();  48.          49.        50.     

28、;  8.initializeMemoryManagement,初始化每個TrackTask的內(nèi)存設(shè)置9.new一個Map和Reducer的Launcher后臺線程java view plaincopyprint?1. mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);  2.  reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxRe

29、duceSlots);  3.  mapLauncher.start();  4.  reduceLauncher.start();  用于后面創(chuàng)建子JVM來執(zhí)行map、reduce task看一下java view plaincopyprint?1. TaskLauncher的run方法:  2.  /before preparing the job localize   3.   &

30、#160;   /all the archives  4.       TaskAttemptID taskid = t.getTaskID();  5.       final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.

31、local.dir");  6.       /simply get the location of the workDir and pass it to the child. The  7.       /child will do the actual&#

32、160;dir creation  8.       final File workDir =  9.       new File(new Path(localdirsrand.nextInt(localdirs.length),   10.         

33、0; TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),   11.           taskid.toString(),  12.           t.isTaskCleanupTask().toString(); 

34、60;13.         14.       String user = tip.getUGI().getUserName();  15.         16.       / Set up the child t

35、ask's configuration. After this call, no localization  17.       / of files should happen in the TaskTracker's process space. Any changes to  18.  

36、     / the conf object after this will NOT be reflected to the child.  19.       / setupChildTaskConfiguration(lDirAlloc);  20.   21.   

37、0;   if (!prepare()   22.         return;  23.         24.         25.       / Accumulates clas

38、s paths for child.  26.       List<String> classPaths = getClassPaths(conf, workDir,  27.                     

39、;                          taskDistributedCacheManager);  28.   29.       long logSize = TaskLog.getTaskL

40、ogLength(conf);  30.         31.       /  Build exec child JVM args.  32.       Vector<String> vargs = getVMArgs(taskid, wor

41、kDir, classPaths, logSize);  33.         34.       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);  35.   36.       / set me

42、mory limit using ulimit if feasible and necessary .  37.       String setup = getVMSetupCmd();  38.       / Set up the redirection of t

43、he task's stdout and stderr streams  39.       File logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask();  40.       File stdout = logFiles0;  

44、;41.       File stderr = logFiles1;  42.       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,  43.             

45、60;    stderr);  44.         45.       Map<String, String> env = new HashMap<String, String>();  46.       errorInfo&#

46、160;= getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,  47.                                &

47、#160;   logSize);  48.         49.       / flatten the env as a set of export commands  50.       List <String&g

48、t; setupCmds = new ArrayList<String>();  51.       for(Entry<String, String> entry : env.entrySet()   52.         StringBuffer sb = new

49、60;StringBuffer();  53.         sb.append("export ");  54.         sb.append(entry.getKey();  55.         sb.append("=""

50、);  56.         sb.append(entry.getValue();  57.         sb.append(""");  58.         setupCmds.add(sb.toString();  59.  

51、;       60.       setupCmds.add(setup);  61.         62.       launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir

52、);  63.       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID();  64.       if (exitCodeSet)   65.         if (!killed &&

53、60;exitCode != 0)   66.           if (exitCode = 65)   67.             tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTask

54、ID();  68.             69.           throw new IOException("Task process exit with nonzero status of " +  70.

55、              exitCode + ".");  71.           72.         73.       run方法為當(dāng)前task

56、 new一個child JVM,為其設(shè)置文件路徑,上下文環(huán)境,JVM啟動參數(shù)和啟動命令等信息,然后調(diào)用TaskControll方法啟動新的JVM執(zhí)行對應(yīng)的Task工作。各個類關(guān)系圖如下所示:最后以TaskController的launchTask截至10.然后開始  startHealthMonitor(this.fConf);再來看看TaskLauncher的run方法,就是不停的循環(huán)去獲取TaskTracker中新的task,然后調(diào)用startNewTask方法java view plaincopyprint?1. if (this.taskStatus.ge

57、tRunState() = TaskStatus.State.UNASSIGNED |  2.          this.taskStatus.getRunState() = TaskStatus.State.FAILED_UNCLEAN |  3.          this.taskStatus.getRunSt

58、ate() = TaskStatus.State.KILLED_UNCLEAN)   4.        localizeTask(task);  5.        if (this.taskStatus.getRunState() = TaskStatus.State.UNASSIGNED)   6.   

59、;       this.taskStatus.setRunState(TaskStatus.State.RUNNING);  7.          8.        setTaskRunner(task.createRunner(TaskTracker.this, this, rjob);  9. &#

60、160;      this.runner.start();  10.        long now = System.currentTimeMillis();  11.        this.taskStatus.setStartTime(now);  12.    

61、60;   this.lastProgressReport = now;  TaskTracker的run方法:通過維護心跳和JobTracker通信,以獲取、殺掉新的Task,重點看一下heartBeat通信過程:java view plaincopyprint?1. synchronized (this)   2.      askForNewTask =   3.  &#

62、160;     (status.countOccupiedMapSlots() < maxMapSlots |   4.          status.countOccupiedReduceSlots() < maxReduceSlots) &&   5.     &

63、#160;   acceptNewTasks);   6.      localMinSpaceStart = minSpaceStart;  7.      8.    if (askForNewTask)   9.      askForNewTask = 

64、;enoughFreeSpace(localMinSpaceStart);  10.      long freeDiskSpace = getFreeSpace();  11.      long totVmem = getTotalVirtualMemoryOnTT();  12.      long totPme

65、m = getTotalPhysicalMemoryOnTT();  13.      long availableVmem = getAvailableVirtualMemoryOnTT();  14.      long availablePmem = getAvailablePhysicalMemoryOnTT();  15.   

66、   long cumuCpuTime = getCumulativeCpuTimeOnTT();  16.      long cpuFreq = getCpuFrequencyOnTT();  17.      int numCpu = getNumProcessorsOnTT();  18.  

67、0;   float cpuUsage = getCpuUsageOnTT();  19.   20.      status.getResourceStatus().setAvailableSpace(freeDiskSpace);  21.      status.getResourceStatus().setTotalVirtualMemory(totVmem);

68、0; 22.      status.getResourceStatus().setTotalPhysicalMemory(totPmem);  23.      status.getResourceStatus().setMapSlotMemorySizeOnTT(  24.          mapSlotMemorySizeOnTT); &#

69、160;25.      status.getResourceStatus().setReduceSlotMemorySizeOnTT(  26.          reduceSlotSizeMemoryOnTT);  27.      status.getResourceStatus().setAvailableVirtualMemory(availableVme

70、m);   28.      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);  29.      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);  30.      status.getResourceStatu

71、s().setCpuFrequency(cpuFreq);  31.      status.getResourceStatus().setNumProcessors(numCpu);  32.      status.getResourceStatus().setCpuUsage(cpuUsage);  33.      34.    /add 

72、node health information  35.      36.    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();  37.    synchronized (this)   38.      if (healthChecker != null)   39.        heal

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論