




版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第4章分布式計(jì)算框架MapReduce《Hadoop大數(shù)據(jù)原理與應(yīng)用》西安電子科技大學(xué)出版社【知識(shí)與能力要求】第4章分布式計(jì)算框架MapReduce4.1MapReduce簡(jiǎn)介4.2第一個(gè)MapReduce案例:WordCount4.3MapReduce作業(yè)執(zhí)行流程4.4MapReduce數(shù)據(jù)類(lèi)型與格式4.5Shuffle機(jī)制4.6在MapReduce中自定義組件4.7實(shí)戰(zhàn)MapReduce4.8MapReduce調(diào)優(yōu)4.9其他主流計(jì)算框架4.1MapReduce簡(jiǎn)介移動(dòng)計(jì)算比移動(dòng)數(shù)據(jù)更劃算Google論文分而治之4.1MapReduce簡(jiǎn)介MapReduce是Hadoop生態(tài)中的一款分布式運(yùn)算框架,它提供了非常完善的分布式架構(gòu),可以讓不熟悉分布式計(jì)算的人員也能編寫(xiě)出優(yōu)秀的分布式系統(tǒng),因此可以讓開(kāi)發(fā)人員將精力專(zhuān)注到業(yè)務(wù)邏輯本身。MapReduce采用“分而治之”的核心思想,可以先將一個(gè)大型任務(wù)拆分成若干個(gè)簡(jiǎn)單的子任務(wù),然后將每個(gè)子任務(wù)交給一個(gè)獨(dú)立的節(jié)點(diǎn)去處理。當(dāng)所有節(jié)點(diǎn)的子任務(wù)都處理完畢后,再匯總所有子任務(wù)的處理結(jié)果,從而形成最終的結(jié)果。4.1MapReduce簡(jiǎn)介MapReduce在發(fā)展史上經(jīng)過(guò)一次重大改變,舊版MapReduce(MapReduce1.0)采用的是典型的Master/Slave結(jié)構(gòu),Master表現(xiàn)為JobTracker進(jìn)程,而Slave表現(xiàn)為T(mén)askTracker。但是這種結(jié)果過(guò)于簡(jiǎn)單,例如Master的任務(wù)過(guò)于集中,并且存在單點(diǎn)故障等問(wèn)題。因此,MapReduce進(jìn)行了一次重要的升級(jí),舍棄JobTracker和TaskTracker,而改用了ResourceManager進(jìn)程負(fù)責(zé)處理資源,并且使用ApplicationMaster進(jìn)程管理各個(gè)具體的應(yīng)用,用NodeManager進(jìn)程對(duì)各個(gè)節(jié)點(diǎn)的工作情況進(jìn)行監(jiān)聽(tīng)。升級(jí)后的MapReduce稱(chēng)為MapReduce2.0,但也許由于“MapReduce”這個(gè)詞已使用太久,有些參考資料中經(jīng)常使用“MapReduce”來(lái)代指YARN。YARN的具體組成結(jié)構(gòu)和各部分的作用,會(huì)在第5章中進(jìn)行詳細(xì)介紹。4.1MapReduce簡(jiǎn)介如果要統(tǒng)計(jì)一個(gè)擁有海量單詞的詞庫(kù),就可以先將整個(gè)詞庫(kù)拆分成若干個(gè)小詞庫(kù),然后將各個(gè)小詞庫(kù)發(fā)送給不同的節(jié)點(diǎn)去計(jì)算,當(dāng)所有節(jié)點(diǎn)將分配給自己的小詞庫(kù)中的單詞統(tǒng)計(jì)完畢后,再將各個(gè)節(jié)點(diǎn)的統(tǒng)計(jì)結(jié)果進(jìn)行匯總,形成最終的統(tǒng)計(jì)結(jié)果。節(jié)點(diǎn)3海量詞庫(kù)小詞庫(kù)小詞庫(kù)小詞庫(kù)統(tǒng)計(jì)部分單詞統(tǒng)計(jì)全部單詞Map階段Reduce階段節(jié)點(diǎn)1節(jié)點(diǎn)2統(tǒng)計(jì)部分單詞節(jié)點(diǎn)4節(jié)點(diǎn)5MapReduce計(jì)算模型MapReduce將復(fù)雜的、運(yùn)行于大規(guī)模集群上的并行計(jì)算過(guò)程高度地抽象到了兩個(gè)函數(shù):Map和Reduce。MapReduce框架會(huì)為每個(gè)Map任務(wù)輸入一個(gè)數(shù)據(jù)子集(split),Map任務(wù)生成的結(jié)果會(huì)繼續(xù)作為Reduce任務(wù)的輸入,最終由Reduce任務(wù)輸出最后結(jié)果,并寫(xiě)入分布式文件系統(tǒng)。編程容易,不需要掌握分布式并行編程細(xì)節(jié),也可以很容易把自己的程序運(yùn)行在分布式系統(tǒng)上,完成海量數(shù)據(jù)的計(jì)算。Map和Reduce函數(shù)鍵值對(duì):<key,value>Step1將數(shù)據(jù)抽象為鍵值對(duì)形式,接著map函數(shù)會(huì)以鍵值對(duì)作為輸入,經(jīng)過(guò)map函數(shù)的處理,產(chǎn)生一系列新的鍵值對(duì)作為中間結(jié)果輸出到本地。Step2MapReduce框架自動(dòng)將這些中間結(jié)果數(shù)據(jù)按照鍵做聚合處理,并將鍵相同的數(shù)據(jù)分發(fā)給reduce函數(shù)處理。Step3reduce函數(shù)以鍵和對(duì)應(yīng)的值的集合作為輸入,經(jīng)過(guò)reduce函數(shù)處理后,產(chǎn)生另外一系列鍵值對(duì)作為最終輸出。{key1,value1}→{key2,List<value2>}→{key3,value3}Map和Reduce函數(shù)函數(shù)輸入輸出說(shuō)明Map<k1,v1>如:<行號(hào),”abc”>List(<k2,v2>)如:<“a”,1><“b”,1><“c”,1>1.將小數(shù)據(jù)集進(jìn)一步解析成一批<key,value>對(duì),輸入Map函數(shù)中進(jìn)行處理2.每一個(gè)輸入的<k1,v1>會(huì)輸出一批<k2,v2>。<k2,v2>是計(jì)算的中間結(jié)果Reduce<k2,List(v2)>如:<“a”,<1,1,1>><k3,v3><“a”,3>輸入的中間結(jié)果<k2,List(v2)>中的List(v2)表示是一批屬于同一個(gè)k2的value4.2第一個(gè)MapReduce案例:WordCountHadoop提供了一個(gè)MapReduce入門(mén)案例“WordCount”,用于統(tǒng)計(jì)輸入文件中每個(gè)單詞出現(xiàn)的次數(shù)。該案例源碼保存在$HADOOP_HOME/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.9.2.jar的WordCount.java中,其源碼共分為T(mén)okenizerMapper類(lèi)、IntSumReducer類(lèi)和main()函數(shù)三個(gè)部分。4.2.1TokenizerMapper類(lèi)從類(lèi)名可知,該類(lèi)是Map階段的實(shí)現(xiàn)。并且能夠發(fā)現(xiàn),在MapReduce中Map階段的業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Mapper類(lèi)。Mapper類(lèi)的四個(gè)泛型分別表示輸入數(shù)據(jù)的key類(lèi)型、輸入數(shù)據(jù)的value類(lèi)型、輸出數(shù)據(jù)的key類(lèi)型、輸出數(shù)據(jù)的value類(lèi)型。以本次“WordCount”為例,每次Map階段需要處理的數(shù)據(jù)是文件中的一行數(shù)據(jù),而默認(rèn)情況下這一行數(shù)據(jù)的偏移量(該行起始位置距離文件初始位置的位移)就是輸入數(shù)據(jù)的key類(lèi)型(一般而言,偏移量是一個(gè)長(zhǎng)整型,也可以寫(xiě)成本例中使用的Object類(lèi)型);輸入數(shù)據(jù)的value類(lèi)型就是這行數(shù)據(jù)本身,因此是Text類(lèi)型(即hadoop中定義的字符串類(lèi)型);輸出數(shù)據(jù)的key類(lèi)型是每個(gè)單詞本身,因此也是Text類(lèi)型;而輸出數(shù)據(jù)的value類(lèi)型,就表示該單詞出現(xiàn)了一次,因此就是數(shù)字1,可以表示為IntWritable類(lèi)型(即Hadoop中定義的整數(shù)類(lèi)型)。TokenizerMapper類(lèi)源碼importorg.apache.hadoop.mapreduce.Mapper;publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{
privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();
publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{StringTokenizeritr=newStringTokenizer(value.toString());while(itr.hasMoreTokens()){word.set(itr.nextToken());context.write(word,one);}}}Mapper類(lèi)提供了map()方法,用于編寫(xiě)Map階段具體的業(yè)務(wù)邏輯。map()方法的前兩個(gè)參數(shù)表示輸入數(shù)據(jù)的key類(lèi)型和value類(lèi)型(即與Mapper類(lèi)前兩個(gè)參數(shù)的含義一致),而map()的第三個(gè)參數(shù)Context對(duì)象表示Map階段的上下文對(duì)象,可以用于將Map階段的產(chǎn)物輸出到下一個(gè)階段中。4.2.1TokenizerMapper類(lèi)現(xiàn)在具體分析TokenizerMapper類(lèi)的源碼:當(dāng)輸入數(shù)據(jù)被提交到MapReduce流程后,MapReduce會(huì)先將輸入數(shù)據(jù)進(jìn)行拆分(Split),之后再將拆分后的文件塊提交給Map階段的map()方法。map()方法拿到文件塊后,默認(rèn)以“行”為單位進(jìn)行讀取,每次讀取一行數(shù)據(jù)后,再通過(guò)StringTokenizer構(gòu)造方法將該行數(shù)據(jù)以空白字符(空格、制表符等)為分隔符進(jìn)行拆分,然后再遍歷拆分后的單詞,并將每個(gè)單詞的輸出value設(shè)置為1。也就是說(shuō),Map階段會(huì)將讀入到的每一行數(shù)據(jù)拆分成各個(gè)單詞,然后標(biāo)記該單詞出現(xiàn)了1次,之后再通過(guò)context.write()輸出到MapReduce中的下一個(gè)階段。Map階段HelloWorldHelloHadoop...Hello,1World,1Hello,1Hadoop,1...4.2.2IntSumReducer類(lèi)數(shù)據(jù)在經(jīng)過(guò)了Shuffle階段后,就會(huì)進(jìn)入Reduce階段。入門(mén)案例“WordCount”中的Reduce源碼如下所示。importorg.apache.hadoop.mapreduce.Reducer;publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{intsum=0;for(IntWritableval:values){sum+=val.get();}result.set(sum);context.write(key,result);}}4.2.2IntSumReducer類(lèi)Reduce階段的業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Reducer類(lèi)。Reducer類(lèi)的四個(gè)泛型分別表示Reduce階段輸入數(shù)據(jù)的key類(lèi)型、value類(lèi)型,以及Reduce階段輸出數(shù)據(jù)的key類(lèi)型、value類(lèi)型。MapReduce的流程依次Map階段、Shuffle階段和Reduce階段,因此Shuffle階段的產(chǎn)物就是Reduce階段的輸入數(shù)據(jù)。也就是說(shuō),本例中的IntSumReducer就是對(duì)Shuffle的產(chǎn)物進(jìn)行了統(tǒng)計(jì),即計(jì)算出了hello、world和hadoop各個(gè)單詞出現(xiàn)的次數(shù)分別是2、1和1,Reduce階段的輸出結(jié)果如圖4-5所示。IntSumReducer源碼中最后的context.write()方法表示將Reduce階段的產(chǎn)物輸出到最終的HDFS中進(jìn)行存儲(chǔ),而存儲(chǔ)在HDFS中的具體位置是在main()方法中進(jìn)行設(shè)置的。Reduce階段Hello,2World,1Hadoop,1...HelloWorldHelloHadoop...Hello,1World,1Hello,1Hadoop,1...4.2.3入口方法
入門(mén)案例“WordCount”中main()方法的源碼如下所示。publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<2){System.err.println("Usage:wordcount<in>[<in>...]<out>");System.exit(2);}Jobjob=Job.getInstance(conf,"wordcount");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for(inti=0;i<otherArgs.length-1;++i){FileInputFormat.addInputPath(job,newPath(otherArgs[i]));}FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1]));System.exit(job.waitForCompletion(true)?0:1);}4.2.3入口方法
從上述源碼可知,main()的輸入?yún)?shù)決定了輸入數(shù)據(jù)的文件位置,以及輸出數(shù)據(jù)存儲(chǔ)到HDFS中的位置。并且main()方法設(shè)置了Map階段和Reduce階段的類(lèi)文件,以及通過(guò)setOutputKeyClass()和setOutputValueClass()指定了最終輸出數(shù)據(jù)的類(lèi)型。4.2.4向Hadoop集群提交并運(yùn)行WordCount使用如下命令向Hadoop集群提交并運(yùn)行WordCount。hadoopjar/usr/local/hadoop-2.9.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jarwordcount/InputDataTest/OutputDataTest3上述命令中,/InputDataTest表示輸入目錄,/OutputDataTest3表示輸出目錄。執(zhí)行該命令前,假設(shè)HDFS的目錄/InputDataTest下已存在待分析詞頻的3個(gè)文件,而輸出目錄/OutputDataTest3不存在,在執(zhí)行過(guò)程中會(huì)自動(dòng)創(chuàng)建。4.3MapReduce作業(yè)執(zhí)行流程InputFormatMapShuffleReduceOutputFormat最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>加載文件最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>寫(xiě)入文件分布式文件系統(tǒng)(如HDFS)InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)1加載文件寫(xiě)入文件InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)2分布式文件系統(tǒng)(如HDFS)一個(gè)WordCount執(zhí)行過(guò)程的實(shí)例HDFS文件file.txtHelloWorldHelloHadoopHelloMapReduceInput<1,”HelloWorld”><2,”HelloHadoop”><3,”HelloMapReduce”>split<”Hello”,1><“World”,1><“Hello”,1><“Hadoop”,1><“Hello”,1><”MapReduce”,1>Map<“Hadoop”,1><”Hello”,<1,1,1>><”MapReduce”,1><“World”,1>Shuffle<“Hadoop”,1><”Hello”,3><”MapReduce”,1><“World”,1>ReduceHDFS文件(part-r-00000)Output4.3.1作業(yè)執(zhí)行流程(1)InputFormatInputFormat模塊首先對(duì)輸入數(shù)據(jù)做預(yù)處理,比如驗(yàn)證輸入格式是否符合輸入定義;然后將輸入文件切分為邏輯上的多個(gè)InputSplit,InputSplit是MapReduce對(duì)文件進(jìn)行處理和運(yùn)算的輸入單位,并沒(méi)有對(duì)文件進(jìn)行實(shí)際切割;由于InputSplit是邏輯切分而非物理切分,所以還需要通過(guò)RecordReader(圖4-4中的RR)根據(jù)InputSplit中的信息來(lái)處理InputSplit中的具體記錄,加載數(shù)據(jù)并轉(zhuǎn)換為適合Map任務(wù)讀取的鍵值對(duì)<key,valule>,輸入給Map任務(wù)。(2)MapMap模塊會(huì)根據(jù)用戶(hù)自定義的映射規(guī)則,輸出一系列的<key,value>作為中間結(jié)果。(3)Shuffle為了讓Reduce可以并行處理Map的結(jié)果,需要對(duì)Map的輸出進(jìn)行一定的排序、分區(qū)、合并、歸并等操作,得到<key,List(value)>形式的中間結(jié)果,再交給對(duì)應(yīng)的Reduce進(jìn)行處理,這個(gè)過(guò)程叫做Shuffle。(4)ReduceReduce以一系列的<key,List(value)>中間結(jié)果作為輸入,執(zhí)行用戶(hù)定義的邏輯,輸出<key,valule>形式的結(jié)果給OutputFormat。(5)OutputFormatOutputFormat模塊會(huì)驗(yàn)證輸出目錄是否已經(jīng)存在以及輸出結(jié)果類(lèi)型是否符合配置文件中的配置類(lèi)型,如果都滿(mǎn)足,就輸出Reduce的結(jié)果到分布式文件系統(tǒng)。4.3.2作業(yè)執(zhí)行流程的源碼解析上節(jié)main()的最后一行代碼,將整個(gè)任務(wù)通過(guò)job.waitForCompletion(true)提交到了MapReduce作業(yè)中。而waitForCompletion()在底層調(diào)用了org.apache.hadoop.mapreduce.Job類(lèi)的submit()方法,源碼如下所示。publicvoidsubmit()throwsIOException,InterruptedException,ClassNotFoundException{...connect();finalJobSubmittersubmitter=getJobSubmitter(cluster.getFileSystem(),cluster.getClient());status=ugi.doAs(newPrivilegedExceptionAction<JobStatus>(){publicJobStatusrun()throwsIOException,InterruptedException,ClassNotFoundException{returnsubmitter.submitJobInternal(Job.this,cluster);}});state=JobState.RUNNING;...}connect()其中,connect()依次調(diào)用了cluster()構(gòu)造方法和initialize()方法,并在initialize()中通過(guò)clientProtocol變量判斷當(dāng)前MapReduce的運(yùn)行環(huán)境是本地或YARN(判斷時(shí)借助了Configuration對(duì)象,此對(duì)象可以獲取hadoop的配置文件信息。而MapReduce的運(yùn)行環(huán)境就是在配置文件mapred-site.xml中的屬性里配置的)。submitter.submitJobInternal()connect()方法結(jié)束后,就進(jìn)入了submitter.submitJobInternal()方法,用于真正的將任務(wù)提交到Y(jié)ARN中,其源碼如下所示。JobStatussubmitJobInternal(Jobjob,Clustercluster)throwsClassNotFoundException,InterruptedException,IOException{checkSpecs(job);Configurationconf=job.getConfiguration();addMRFrameworkToDistributedCache(conf);PathjobStagingArea=JobSubmissionFiles.getStagingDir(cluster,conf);…JobIDjobId=submitClient.getNewJobID();job.setJobID(jobId);PathsubmitJobDir=newPath(jobStagingArea,jobId.toString());JobStatusstatus=null;try{…copyAndConfigureFiles(job,submitJobDir);
PathsubmitJobFile=JobSubmissionFiles.getJobConfPath(submitJobDir);…
//WritejobfiletosubmitdirwriteConf(conf,submitJobFile);
//Now,actuallysubmitthejob(usingthesubmitname)printTokens(jobId,job.getCredentials());status=submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());}finally{…}}submitter.submitJobInternal()submitJobInternal()在底層先后調(diào)用了checkSpecs()和checkOutputSpecs()方法,用于對(duì)MapReduce的輸出參數(shù)進(jìn)行檢查。而checkOutputSpecs()是在頂級(jí)抽象類(lèi)OutputFormat中定義的,其作者在對(duì)checkOutputSpecs()的描述時(shí)明確說(shuō)到“MapReduce運(yùn)行之前,必須先保證輸出路徑不存在,否則會(huì)拋出一個(gè)IOException”。因此,我們?cè)趍ain()中通過(guò)FileOutputFormat.setOutputPath()給MapRedcue設(shè)置輸出路徑時(shí),要?jiǎng)?wù)必注意這一點(diǎn)。在輸出參數(shù)檢查完畢后,MapReduce會(huì)將準(zhǔn)備提交到集群的文件先暫時(shí)存放到一個(gè)暫存區(qū)中,該暫存區(qū)的位置可以在submitJobInternal()中通過(guò)JobSubmissionFiles.getStagingDir()獲取。之后MapReduce再給本次提交的任務(wù)分配一個(gè)JobId,然后再將暫存區(qū)和JobId封裝到submitJobDir對(duì)象中。實(shí)際上,submitJobDir就是一個(gè)由暫存區(qū)和JobId拼接起來(lái)的路徑。之后,copyAndConfigureFiles()方法會(huì)將本地要處理的任務(wù)資源上傳到HDFS中的submitJobDir路徑中等待處理。隨后writeSplits()會(huì)將切片信息寫(xiě)入到submitJobDir中(例如,切片信息描述了是如何將一個(gè)較大的輸入文件切分成了多個(gè)小文件處理)。后面的writeConf()方法用于將當(dāng)前待處理的任務(wù)信息封裝成job.xml并寫(xiě)入到submitJobDir中。程序最后的submitClient.submitJob()將submitJobDir的最終文件提交到Y(jié)ARN中,并在提交后清空暫存區(qū)。ResourceManagerNameNodeNodeManagerApplicationMasterDataNodeNodeManagerApplicationMasterDataNodeNodeManagerContainerDataNodeContainerNodeManagerContainerDataNodeNodeManagerContainerDataNodeNodeManagerContainerDataNodeClientClient4.3.3作業(yè)執(zhí)行時(shí)架構(gòu)MapReduce1.0體系架構(gòu)客戶(hù)端JobTrackerTaskTracker1TaskTracker2TaskTracker3TaskTrackerN提交任務(wù)分配并監(jiān)管任務(wù)MapTaskReduceTaskMapTaskMapTaskReduceTaskMapTaskMapTaskReduceTaskMapTaskMapTaskReduceTaskMapTask4.3.3作業(yè)執(zhí)行時(shí)架構(gòu)ResourceManager稱(chēng)為資源管理器,負(fù)責(zé)集中管理MapReduce執(zhí)行作業(yè)時(shí)所有參與運(yùn)算的全部計(jì)算機(jī)的資源(例如,統(tǒng)一管理所有計(jì)算機(jī)的CPU、內(nèi)存、硬盤(pán)等),并將這些資源按需分配給各個(gè)節(jié)點(diǎn)。ApplicationMaster稱(chēng)為應(yīng)用管理器,負(fù)責(zé)管理某一個(gè)具體應(yīng)用的調(diào)度工作。例如,某一次MapReduce作業(yè)可能被劃分為了若干個(gè)小應(yīng)用,而每一個(gè)應(yīng)用就對(duì)應(yīng)著由一個(gè)ApplicationMaster全權(quán)管理。Container稱(chēng)為容器,ResourceManager分配給各個(gè)節(jié)點(diǎn)的資源會(huì)被存入獨(dú)立存入Container中運(yùn)行。也就是說(shuō),Container就是YARN提供的一套隔離資源的容器。每個(gè)容器內(nèi)都擁有一套可獨(dú)立運(yùn)行的資源環(huán)境(CPU、內(nèi)存以及當(dāng)前需要執(zhí)行的任務(wù)文件等)。NodeManager稱(chēng)為節(jié)點(diǎn)監(jiān)視器,它會(huì)向ApplicationMaster匯報(bào)當(dāng)前節(jié)點(diǎn)的資源使用情況,以及所管理的所有Container的運(yùn)行狀態(tài)。此外,它還會(huì)接收并處理ApplicationMaster發(fā)來(lái)的啟動(dòng)任務(wù)、停止任務(wù)等請(qǐng)求。4.4MapReduce數(shù)據(jù)類(lèi)型與格式MapReduce使用了Text定義字符串,使用了IntWritable定義整型變量,而沒(méi)有使用Java內(nèi)置的String和int類(lèi)型。這樣做主要是有兩個(gè)方面的原因:MapReduce是集群運(yùn)算,因此必然會(huì)在執(zhí)行期間進(jìn)行網(wǎng)絡(luò)傳輸,然而在網(wǎng)絡(luò)中傳輸?shù)臄?shù)據(jù)必須是可序列化的類(lèi)型。為了良好地匹配MapReduce內(nèi)部的運(yùn)行機(jī)制,MapReduce就專(zhuān)門(mén)設(shè)計(jì)了一套數(shù)據(jù)類(lèi)型。MapReduce中常見(jiàn)數(shù)據(jù)類(lèi)型數(shù)據(jù)類(lèi)型說(shuō)明IntWritable整型類(lèi)型LongWritable長(zhǎng)整型類(lèi)型FloatWritable單精度浮點(diǎn)數(shù)類(lèi)型DoubleWritable雙精度浮點(diǎn)數(shù)類(lèi)型ByteWritable字節(jié)類(lèi)型BooleanWritable布爾類(lèi)型TextUTF-8格式存儲(chǔ)的文本類(lèi)型NullWritable空對(duì)象4.4MapReduce數(shù)據(jù)類(lèi)型與格式需要注意的是,這些數(shù)據(jù)類(lèi)型的定義類(lèi)都實(shí)現(xiàn)了WritableComparable接口,其源碼如下所示。publicabstractinterfaceWritableComparableextendsWritable,Comparable{...}可以發(fā)現(xiàn),WritableComparable繼承自Writable和Comparable接口。其中Writable就是MapReduce提供的序列化接口(類(lèi)似于Java中的Serializable接口),源碼如下所示。publicabstractinterfaceorg.apache.hadoop.io.Writable{publicabstractvoidwrite(DataOutputoutput)throwsIOException;publicabstractvoidreadFields(DataInputinput)throwsjava.io.IOException;...}其中,write()用于將數(shù)據(jù)進(jìn)行序列化操作;readFields()用于將數(shù)據(jù)進(jìn)行反序列化操作。Comparable接口就是Java中的比較器,用于對(duì)數(shù)據(jù)集進(jìn)行排序操作。因此,如果我們要在MapReduce自定義一個(gè)數(shù)據(jù)類(lèi)型,就需要實(shí)現(xiàn)Writable接口;如果還需要對(duì)自定義的數(shù)據(jù)類(lèi)型進(jìn)行排序操作,就需要實(shí)現(xiàn)WritableComparable接口(或者分別實(shí)現(xiàn)Writable和Comparable接口)。4.5Shuffle機(jī)制Map階段的產(chǎn)物會(huì)經(jīng)過(guò)一個(gè)名為Shuffle的階段。Hello,1World,1Hello,1Hadoop,1?Hello,[1,1]World,[1]Hadoop,[1]Map的輸出Shuffle階段Reduce的輸入4.5Shuffle機(jī)制在Shuffle階段中進(jìn)行排序和分區(qū)操作,Shuffle階段的產(chǎn)物是以“key=單詞,value=出現(xiàn)次數(shù)的數(shù)組”形式輸出。4.5Shuffle機(jī)制在Shuffle階段,會(huì)對(duì)數(shù)據(jù)進(jìn)行以下操作。首先,Shuffle會(huì)持續(xù)接收Map階段發(fā)來(lái)的數(shù)據(jù),并將數(shù)據(jù)寫(xiě)入到一個(gè)“環(huán)形緩沖區(qū)”中,當(dāng)緩沖區(qū)被填滿(mǎn)時(shí)就會(huì)將覆蓋掉的部分?jǐn)?shù)據(jù)溢出存放到“溢出文件”中。
Map發(fā)來(lái)的數(shù)據(jù)溢出文件環(huán)形緩沖區(qū)4.5Shuffle機(jī)制其次,Shuffle會(huì)對(duì)溢出文件中的數(shù)據(jù)進(jìn)行排序(Sort),然后再將排序后的數(shù)據(jù)進(jìn)行分區(qū)(Partition),例如,將字母A-字母K開(kāi)頭的放在0個(gè)分區(qū),將字母L-字母Q開(kāi)頭的放在第1個(gè)分區(qū)……第0區(qū)第1區(qū)第2區(qū)...4.5Shuffle機(jī)制同樣Shuffle會(huì)生成很多個(gè)排序且分區(qū)后的溢出文件,最后,會(huì)將所有溢出文件中相同分區(qū)號(hào)的內(nèi)容進(jìn)行合并(Combine),形成本Map階段最終的第0區(qū)內(nèi)容、第1區(qū)內(nèi)容……第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)......第0區(qū)第1區(qū)第2區(qū)...4.5Shuffle機(jī)制與此同時(shí),其他Map階段也會(huì)生成當(dāng)前Map最終的第0區(qū)內(nèi)容、第1區(qū)內(nèi)容……最后Shuffle會(huì)對(duì)所有Map階段相同分區(qū)號(hào)的內(nèi)容再次進(jìn)行合并,從而形成最終的第0區(qū)內(nèi)容、第1區(qū)內(nèi)容……最后不同區(qū)號(hào)中的內(nèi)容就會(huì)發(fā)送到不同的Reduce中進(jìn)行處理。第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...ReduceTask-0ReduceTask-1ReduceTask-2第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...第0區(qū)第1區(qū)第2區(qū)...4.6在MapReduce中自定義組件“組件”實(shí)際就是一些繼承了MapReduce內(nèi)置類(lèi)的子類(lèi),或者實(shí)現(xiàn)了一些接口的實(shí)現(xiàn)類(lèi)。MapReduce在執(zhí)行時(shí)會(huì)遵循一系列的默認(rèn)規(guī)則,例如默認(rèn)讀取文本文件,并且默認(rèn)以“行”為單位進(jìn)行讀取;默認(rèn)以字典順序?qū)?shù)據(jù)進(jìn)行排序;根據(jù)默認(rèn)規(guī)則進(jìn)行分區(qū)等。我們也可以對(duì)這些默認(rèn)規(guī)則進(jìn)行自定義設(shè)置,從而以自定義組件的形式運(yùn)行MapReduce程序。4.6.1自定義輸入組件MapReduce接收輸入數(shù)據(jù)的頂級(jí)類(lèi)是org.apache.hadoop.mapreduce.InputFormat<K,V>,其源碼如下所示。@InterfaceAudience.Public@InterfaceStability.StablepublicabstractclassInputFormat<K,V>{publicabstractList<InputSplit>getSplits(JobContextcontext)throwsIOException,InterruptedException;publicabstractRecordReader<K,V>createRecordReader(InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException;}4.6.1自定義輸入組件MapReduce接收輸入數(shù)據(jù)的頂級(jí)類(lèi)是org.apache.hadoop.mapreduce.InputFormat<K,V>,其源碼如下所示。@InterfaceAudience.Public@InterfaceStability.StablepublicabstractclassInputFormat<K,V>{publicabstractList<InputSplit>getSplits(JobContextcontext)throwsIOException,InterruptedException;publicabstractRecordReader<K,V>createRecordReader(InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException;}其中g(shù)etSplits()用戶(hù)獲取輸入文件的切片,而createRecordReader()就用于處理每次讀取的數(shù)據(jù)形式。而在WordCount程序中默認(rèn)使用的輸入組件是TextInputFormat,該組件就通過(guò)重寫(xiě)createRecordReader()實(shí)現(xiàn)了以“行”讀取的默認(rèn)行為。4.6.1自定義輸入組件將默認(rèn)的拆分符號(hào)修改成英文逗號(hào)“,”。publicclassMyInputFormatextendsTextInputFormat{privatestaticfinalStringMY_DELIMITER=",";@OverridepublicRecordReader<LongWritable,Text>createRecordReader(InputSplitsplit,TaskAttemptContexttac){byte[]recordDelimiterBytes=null;recordDelimiterBytes=MY_DELIMITER.getBytes();returnnewLineRecordReader(recordDelimiterBytes);}...}4.6.1自定義輸入組件以后只需要在main()中將輸入組件設(shè)置為MyInputFormat,就可以實(shí)現(xiàn)按逗號(hào)“,”分割的效果,如下所示。publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();…Jobjob=Job.getInstance(conf);//將輸入組件設(shè)置為MyInputFormatjob.setInputFormatClass(MyInputFormat.class);…}4.6.2自定義排序組件如果自定義類(lèi)實(shí)現(xiàn)了Comparable或WritableComparable,就可以在自定義類(lèi)中通過(guò)重寫(xiě)compareTo()方法實(shí)現(xiàn)自定義排序功能。例如,可以通過(guò)以下代碼,讓MapReduce在處理Person類(lèi)數(shù)據(jù)時(shí),按照age屬性進(jìn)行升序排列。publicclassPersonimplementsWritableComparable<Person>{privateintage;…//根據(jù)age字段升序
publicintcompareTo(Personperson){returnthis.getAge()>person.getAge()?1:-1;}}4.6.2自定義排序組件MapReduce會(huì)在Shuffle階段中,對(duì)Map階段輸出的key進(jìn)行排序,因此還需要將Person對(duì)象設(shè)置為Map階段中context.write()方法的第一個(gè)參數(shù),代碼如下所示。classPersonMapperextendsMapper<LongWritable,Text,Person,Text>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Personperson=.../*Person重寫(xiě)了compareTo()方法,因此Person對(duì)象會(huì)根據(jù)自定義的規(guī)則(根據(jù)age升序)進(jìn)行排序*/context.write(person,...);}}4.6.3自定義分區(qū)組件在Shuffle階段,MapReduce會(huì)對(duì)溢出文件中排好序的數(shù)據(jù)進(jìn)行分區(qū)。如果要指定分區(qū),就需要使用MapReduce提供的抽象類(lèi)org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE>,并重寫(xiě)其中的getPartition()方法。該方法的返回值,就是各個(gè)區(qū)的區(qū)號(hào)。例如,以下代碼就將數(shù)據(jù)隨機(jī)分配到第0區(qū)或第1區(qū)中。publicclassMyPartitionerextendsPartitioner<Text,Person>{@OverridepublicintgetPartition(Textkey,Personperson,intnumPartitions){//返回值:分區(qū)號(hào)
return(int)(Math.random()*2);}}4.6.3自定義分區(qū)組件最后,再在main()中將自定義分區(qū)類(lèi)設(shè)置到j(luò)ob中即可,代碼如下所示。publicstaticvoidmain(String[]args)throwsException{...//設(shè)置自定義分區(qū)類(lèi)job.setPartitionerClass(MyPartitioner.class);job.setNumReduceTasks(2);...}一般情況下,ReduceTask的個(gè)數(shù)需要和分區(qū)的數(shù)量保持一致。4.6.4自定義輸出組件MapReduce接收輸出數(shù)據(jù)的頂級(jí)類(lèi)是org.apache.hadoop.mapreduce.OutputFormat<K,V>,其源碼如下所示。packageorg.apache.hadoop.mapreduce;
importjava.io.IOException;importorg.apache.hadoop.classification.InterfaceAudience.Public;importorg.apache.hadoop.classification.InterfaceStability.Stable;
@Public@StablepublicabstractclassOutputFormat<K,V>{publicOutputFormat(){}
publicabstractRecordWriter<K,V>getRecordWriter(TaskAttemptContextvar1)throwsIOException,InterruptedException;
publicabstractvoidcheckOutputSpecs(JobContextvar1)throwsIOException,InterruptedException;
publicabstractOutputCommittergetOutputCommitter(TaskAttemptContextvar1)throwsIOException,InterruptedException;}4.6.4自定義輸出組件其中,通過(guò)RecordWriter來(lái)實(shí)現(xiàn)核心的輸出功能,也就是說(shuō),我們可以通過(guò)重寫(xiě)getRecordWriter(TaskAttemptContextvar1)方法實(shí)現(xiàn)自定義的輸出功能。RecordWriter是一個(gè)抽象類(lèi),它的定義如下所示。packageorg.apache.hadoop.mapreduce;importjava.io.IOException;importorg.apache.hadoop.classification.InterfaceAudience.Public;importorg.apache.hadoop.classification.InterfaceStability.Stable;
@Public@StablepublicabstractclassRecordWriter<K,V>{publicRecordWriter(){}
publicabstractvoidwrite(Kvar1,Vvar2)throwsIOException,InterruptedException;
publicabstractvoidclose(TaskAttemptContextvar1)throwsIOException,InterruptedException;}4.6.4自定義輸出組件不難發(fā)現(xiàn),輸出功能最終還是要通過(guò)write(Kvar1,Vvar2)方法進(jìn)行實(shí)現(xiàn)。該方法的兩個(gè)參數(shù)就是用于指定輸出的具體參數(shù)。close()方法用于輸出完畢后的一些資源釋放操作,例如,如果我們是通過(guò)IO流讀取的本地文件進(jìn)行MapReduce處理,那么就可以將這些流操作通過(guò)close()方法進(jìn)行釋放,防止對(duì)內(nèi)存資源的泄漏。4.7實(shí)戰(zhàn)MapReduceMapReduceWebUIMapReduceShellMapReduceJavaAPI編程4.7.1MapReduceWebUIMapReduceWeb接口面向管理員??梢栽陧?yè)面上看到已經(jīng)完成的所有MR-App執(zhí)行過(guò)程中的統(tǒng)計(jì)信息,該頁(yè)面只支持讀,不支持寫(xiě)。在MapReduce程序運(yùn)行時(shí),我們除了觀(guān)察控制臺(tái)打印的日志以外,還可以通過(guò)Web界面查看具體的運(yùn)行情況。MapReduceWebUI的默認(rèn)地址為http://JobHistoryServerIP:19888,可以查看MapReduce的歷史運(yùn)行情況。MapReduce歷史情況具體作業(yè)詳情4.7.2MapReduceShellMapReduceShell接口面向MapReduce程序員。程序員通過(guò)Shell接口能夠向YARN集群提交MR-App,查看正在運(yùn)行的MR-App,甚至可以終止正在運(yùn)行的MR-App。MapReduceShell命令統(tǒng)一入口為:mapred,語(yǔ)法格式如下:mapred[--configconfdir][--loglevelloglevel]COMMAND讀者需要注意的是,若$HADOOP_HOME/bin未加入到系統(tǒng)環(huán)境變量PATH中,則需要切換到Hadoop安裝目錄下,輸入“bin/mapred”。MapReduceShell命令分為用戶(hù)命令和管理員命令。本章僅介紹部分命令,關(guān)于MapReduceShell命令的完整說(shuō)明,讀者請(qǐng)參考官方網(wǎng)站/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html。命令“mapred”用法4.7.2MapReduceShell1)用戶(hù)命令命令選項(xiàng)功能描述archive創(chuàng)建一個(gè)Hadoop檔案文件archive-logs將聚合日志合并到Hadoop檔案文件中classpath打印運(yùn)行MapReduce子命令所需的包路徑distcp遞歸拷貝文件或目錄job管理MapReduce作業(yè)pipes運(yùn)行Pipes任務(wù),此功能允許用戶(hù)使用C++語(yǔ)言編寫(xiě)MapReduce程序queue查看JobQueue信息【實(shí)例4-1】【實(shí)例4-1】查看集群中當(dāng)前所有MapReduce作業(yè)信息。首先,可以使用命令“mapredjob-help”來(lái)查看“mapredjob”的幫助信息?!緦?shí)例4-1】其次,通過(guò)使用命令“mapredjob-list”來(lái)顯示出集群當(dāng)前所有節(jié)點(diǎn)信息?!緦?shí)例4-2】【實(shí)例4-2】查看集群中正在運(yùn)行的MapReduce作業(yè)狀態(tài)。假設(shè)集群中當(dāng)前正在運(yùn)行1個(gè)MapReduce作業(yè),“JobID”為job_1568702465801_0001,通過(guò)使用命令“mapredjob-statusjob_1568702465801_0001”來(lái)查看該作業(yè)狀態(tài)。4.7.2MapReduceShell2)管理員命令其中,命令“mapredhistoryserver”與啟動(dòng)MapReduce的命令“mr-jobhistory-daemon.shstarthistoryserver”效果相同。讀者請(qǐng)注意,一般不建議使用命令start-all.sh啟動(dòng)HDFS和YARN,而是建議使用start-dfs.sh和start-yarn.sh命令來(lái)分別啟動(dòng)。另外,對(duì)于一般計(jì)算機(jī)而言,在執(zhí)行start-dfs.sh和start-yarn.sh命令之后最好等待一會(huì)再操作各種MapReduce命令,防止因?yàn)榫€(xiàn)程未加載完畢而導(dǎo)致的各種初始化問(wèn)題。命令選項(xiàng)功能描述historyserver啟動(dòng)JobHistoryServer服務(wù)hsadminJobHistoryServer管理命令接口4.7.2MapReduceShell2)管理員命令在MapReduce程序運(yùn)行一段時(shí)間后,可能由于各種故障造成HDFS的數(shù)據(jù)在各個(gè)DataNode中的分布不均勻的情況,此時(shí)也只需要通過(guò)以下shell命令即可重新分布HDFS集群上的各個(gè)DataNode。$HADOOP_HOME/bin/start-balancer.sh此外,在啟動(dòng)時(shí)可以通過(guò)日志看到“Namenodeinsafemode”提示,這表示系統(tǒng)正在處于安全模式,此時(shí)只需要等待一會(huì)即可(通常是十幾秒)。如果硬件資源較差,也可以通過(guò)執(zhí)行以下命令直接退出安全模式。$HADOOP_HOME/bin/hadoopdfsadmin-safemodeleave4.7.3MapReduceJavaAPI編程MapReduceJavaAPI接口面向Java開(kāi)發(fā)工程師。程序員可以通過(guò)該接口編寫(xiě)MR-App用戶(hù)層代碼MRApplicationBusinessLogic?;赮ARN編寫(xiě)的MR-App和基于MapReduce1.0編寫(xiě)的MR-App編程步驟相同。MR-App稱(chēng)為MapReduce應(yīng)用程序,標(biāo)準(zhǔn)YARN-App包含3部分:MRv2框架中的MRAppMaster、MRClient,加上用戶(hù)編寫(xiě)的MRApplicationBusinessLogic(Mapper類(lèi)和Reduce類(lèi)),合稱(chēng)為MR-App。MR-App編寫(xiě)步驟如下所示:(1)編寫(xiě)MRApplicationBusinessLogic。自行編寫(xiě)。(2)編寫(xiě)MRApplicationMaster。無(wú)需編寫(xiě),Hadoop開(kāi)發(fā)人員已編寫(xiě)好MRAppMaster.java。(3)編寫(xiě)MRApplicationClient。無(wú)需編寫(xiě),Hadoop開(kāi)發(fā)人員已編寫(xiě)好YARNRunner.java。4.7.3MapReduceJavaAPI編程其中,MRApplBusinessLogic編寫(xiě)步驟如下:(1)確定<key,value>對(duì)。(2)定制輸入格式。(3)Mapper階段。(4)Reducer階段。(5)定制輸出格式。編寫(xiě)類(lèi)后,main方法里,按下述過(guò)程依次指向各類(lèi)即可:(1)實(shí)例化配置文件類(lèi)。(2)實(shí)例化Job類(lèi)。(3)指向InputFormat類(lèi)。(4)指向Mapper類(lèi)。(5)指向Partitioner類(lèi)。(6)指向Reducer類(lèi)。(7)指向OutputFormat類(lèi)。(8)提交任務(wù)。4.7.3MapReduceJavaAPI編程1.MapReduceJavaAPI解析前面已經(jīng)使用過(guò)MapReduceJavaAPI提供的Mapper類(lèi)、Reducer類(lèi)和Partitioner類(lèi)等。本節(jié)繼續(xù)學(xué)習(xí)MapReduce提供的類(lèi)org.apache.hadoop.conf.Configuration和org.apache.hadoop.mapreduce.Job。在執(zhí)行MapReduce程序時(shí),可以通過(guò)Configuration設(shè)置MapReduce的運(yùn)行參數(shù),然后將Configuration對(duì)象封裝到Job中進(jìn)行任務(wù)提交,例如“Jobjob=Job.getInstance(conf,…)”。4.7.3MapReduceJavaAPI編程1.MapReduceJavaAPI解析Configuration和Job在初始化時(shí),會(huì)先從外部文件中讀取并加載參數(shù),或使用set方法進(jìn)行設(shè)置,具體如下所示。(1)自動(dòng)加載CLASSPATH下的core-default.xml和core-site.xml,其在Configuration類(lèi)中的相關(guān)源碼如下所示。static{//AdddefaultresourcesaddDefaultResource("core-default.xml");addDefaultResource("core-site.xml");…}4.7.3MapReduceJavaAPI編程1.MapReduceJavaAPI解析(2)自動(dòng)加載CLASSPATH下的mapred-default.xml、mapred-site.xml、yarn-default.xml和yarn-site.xml,其在Job類(lèi)中的相關(guān)源碼如下所示。static{ConfigUtil.loadResources();}涉及到的loadResources()方法的源碼如下所示。publicstaticvoidloadResources(){addDeprecatedKeys();Configuration.addDefaultResource("mapred-default.xml");Configuration.addDefaultResource("mapred-site.xml");Configuration.addDefaultResource("yarn-default.xml");Configuration.addDefaultResource("yarn-site.xml");}MapReduce加載的這些文件,既可以是安裝Hadoop時(shí)$HADOOP_HOME/etc/hadoop中存在的配置文件,也可以是我們導(dǎo)入到項(xiàng)目構(gòu)建路徑中的相關(guān)文件。4.7.3MapReduceJavaAPI編程1.MapReduceJavaAPI解析(3)通過(guò)Configuration對(duì)象硬編碼。MapReduce的運(yùn)行參數(shù)還可以通過(guò)Configuration對(duì)象進(jìn)行設(shè)置,例如“Configuration對(duì)象.set("","yarn")”。除了運(yùn)行參數(shù)以外,還可以通過(guò)Job對(duì)象設(shè)置任務(wù)執(zhí)行時(shí)的Map處理類(lèi)、Reduce處理類(lèi)、輸入類(lèi)型、輸出類(lèi)型,以及設(shè)置任務(wù)的分區(qū)處理類(lèi)等,具體代碼如下所示。job.setMapperClass(…);job.setReducerClass(…);job.setOutputKeyClass(…);job.setOutputValueClass(…);job.setInputFormatClass(…);job.setOutputFormatClass(…);4.7.3MapReduceJavaAPI編程2.MapReduce1.0與MapReduce2.0(1)在使用MapReduceAPI時(shí)還要注意,MapReduce2.0使用的是較新的API,而MapReduce1.0使用的是舊版API,二者在使用上有著一定的差異。(2)MapReduce1.0提供的API是定義在org.apache.hadoop.mapred包中,而MapReduce2.0提供的API是定義在org.apache.hadoop.mapreduce包中。(3)MapReduce1.0框架中使用的是JobTracker和TaskTracker結(jié)構(gòu),而MapReduce2.0使用的是ResourceManager、ApplicationMaster和NodeManager結(jié)構(gòu)。packageorg.apache.hadoop.mapred;import...publicinterfaceMapper<K1,V1,K2,V2>extendsJobConfigurable,Closeable{voidmap(K1var1,V1var2,OutputCollector<K2,V2>var3,Reportervar4)throwsIOException;}packageorg.apache.hadoop.mapreduce;import...publicclassMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{...protectedvoidmap(KEYINkey,VALUEINvalue,Mapper<KEYIN,VALUEIN,KEYOUT,Contextcontext)throwsIOException,InterruptedException{context.write(key,value);}...}MRv1MRv2packageorg.apache.hadoop.mapred;import...publicinterfaceReducer<K2,V2,K3,V3>extendsJobConfigurable,Closeable{voidreduce(K2var1,Iterator<V2>var2,OutputCollector<K3,V3>var3,Reportervar4)throwsIOException;}packageorg.apache.hadoop.mapreduce;import...publicclassReducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{...protectedvoidreduce(KEYINkey,Iterable<VALUEIN>values,Contextcontext)throwsIOException,InterruptedException{Iteratori$=values.iterator();while(i$.hasNext()){VALUEINvalue=i$.next();context.write(key,value);}}...}MRv1MRv24.8MapReduce調(diào)優(yōu)在MapReduce執(zhí)行期間,可能會(huì)出現(xiàn)運(yùn)行速度太慢等性能較低的情況。以下是造成性能較低的一些常見(jiàn)原因以及相應(yīng)解決方案。(1)輸入數(shù)據(jù)中存在大量的小文件MapReduce默認(rèn)使用的輸入類(lèi)TextInputformat會(huì)將每個(gè)小文件作為一個(gè)獨(dú)立的文件切片,并且會(huì)將每個(gè)文件切片交由一個(gè)maptask處理。因此,大量的小文件就會(huì)導(dǎo)致MapReduce產(chǎn)生大量的maptask,從而導(dǎo)致MapReduce整體的效率低下。為了減少小文件的數(shù)量,可以在maptask處理之前將小文件進(jìn)行合并,然后將合并后的文件進(jìn)行處理。如何合并文件呢?既可以通過(guò)程序語(yǔ)言、軟件工具進(jìn)行合并,也可以使用MapReduce提供的CombineFileInputFormat或自定義MapReduce執(zhí)行方式。(2)減少M(fèi)apReduce各階段數(shù)據(jù)傳輸?shù)拇螖?shù)默認(rèn)情況下,數(shù)據(jù)會(huì)從map節(jié)點(diǎn)通過(guò)網(wǎng)絡(luò)傳輸?shù)絩educe節(jié)點(diǎn)。而如果map節(jié)點(diǎn)存在大量的數(shù)據(jù),就會(huì)造成大量數(shù)據(jù)需要經(jīng)由網(wǎng)絡(luò)傳輸?shù)膱?chǎng)景。而我們可以先將各個(gè)map節(jié)點(diǎn)的數(shù)據(jù)在本地處理,然后再將各個(gè)map節(jié)點(diǎn)本地處理的結(jié)果經(jīng)網(wǎng)絡(luò)傳輸?shù)絩educe進(jìn)行匯總即可。以WordCount為例,如果某個(gè)map節(jié)點(diǎn)有100個(gè)“hello”單詞,默認(rèn)情況下會(huì)通過(guò)網(wǎng)絡(luò)傳輸100次“hello”,然后再在reduce端進(jìn)行100的累加。而如果先將100個(gè)“hello”在map端累加完畢,然后將累加的結(jié)果經(jīng)由網(wǎng)絡(luò)傳輸?shù)絩educe端,那么就僅僅會(huì)在網(wǎng)絡(luò)中傳輸1次即可,很明顯可以大大減少網(wǎng)絡(luò)流量。但要注意,并不是所有業(yè)務(wù)邏輯都適合先在map階段處理。讀者可以思考,“求平均數(shù)問(wèn)題”是否適合先在map階段處理,然后再經(jīng)由網(wǎng)絡(luò)傳輸至reduce匯總?將數(shù)據(jù)在map階段處理的過(guò)程稱(chēng)為Combine,自定義Combine類(lèi)需要繼承自Reducer類(lèi)。當(dāng)環(huán)形緩沖區(qū)中的數(shù)據(jù)傳遞到溢出文件時(shí),會(huì)進(jìn)行數(shù)據(jù)的傳輸操作。因此,可以根據(jù)機(jī)器的硬件性能,通過(guò)io.sort.mb參數(shù)適當(dāng)調(diào)大環(huán)形緩沖區(qū)的大小,或者通過(guò)io.sort.spill.percent參數(shù)適當(dāng)調(diào)大環(huán)形緩沖區(qū)中存放數(shù)據(jù)的空間大小。當(dāng)多個(gè)溢出文件進(jìn)行合并時(shí),會(huì)執(zhí)行數(shù)據(jù)的排序操作。顯然,每次合并的文件數(shù)量越多,合并的次數(shù)就越少,排序的次數(shù)也就越少。對(duì)于這點(diǎn),可以通過(guò)io.sort.factor適當(dāng)調(diào)大每次參與合并的文件個(gè)數(shù)。4.8MapReduce調(diào)優(yōu)(
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- DB3707T 135-2025 大蔥三系雜交制種技術(shù)規(guī)程
- 楚雄州統(tǒng)測(cè)數(shù)學(xué)試卷
- 海南優(yōu)騰愛(ài)科醫(yī)療科技有限公司醫(yī)療器械研發(fā)生產(chǎn)環(huán)評(píng)報(bào)告表
- 運(yùn)動(dòng)解剖學(xué)試題冊(cè)答案全套
- 協(xié)同推進(jìn)降碳減污擴(kuò)綠增長(zhǎng)的背景與意義
- 完善基層衛(wèi)生服務(wù)網(wǎng)絡(luò)建設(shè)的策略及實(shí)施路徑
- 國(guó)內(nèi)外醫(yī)療機(jī)構(gòu)水污染物排放現(xiàn)狀
- 低空經(jīng)濟(jì)發(fā)展趨勢(shì)與前景
- 促進(jìn)醫(yī)療服務(wù)的公平性的策略及實(shí)施路徑
- 四級(jí)人力資源管理師-上半人力(四級(jí))《基礎(chǔ)知識(shí)》黑鉆押題4
- 安全生產(chǎn)承包的合同
- 8.3 摩擦力(共28張) 2024-2025學(xué)年人教版物理八年級(jí)下冊(cè)
- 2025年陜西延長(zhǎng)石油物流集團(tuán)有限公司招聘筆試參考題庫(kù)含答案解析
- 2025年部編版語(yǔ)文三年級(jí)下冊(cè)全冊(cè)單元測(cè)試題附答案(共8個(gè)單元)
- 兒童腺樣體肥大治療方案-深度研究
- 2025年合肥經(jīng)濟(jì)技術(shù)職業(yè)學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性測(cè)試題庫(kù)帶答案
- 2025年懷化職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)技能測(cè)試題庫(kù)必考題
- 2025年第六屆(中小學(xué)組)國(guó)家版圖知識(shí)競(jìng)賽測(cè)試題庫(kù)及答案
- 2025年中國(guó)床墊機(jī)械行業(yè)市場(chǎng)發(fā)展監(jiān)測(cè)及投資戰(zhàn)略咨詢(xún)報(bào)告
- C小學(xué)一起諾如病毒胃腸炎疫情的調(diào)查與處置課件
- 2025年鎵礦采選項(xiàng)目投資可行性研究分析報(bào)告
評(píng)論
0/150
提交評(píng)論