Hadoop數(shù)據(jù)分析與應(yīng)用MapReduce_第1頁
Hadoop數(shù)據(jù)分析與應(yīng)用MapReduce_第2頁
Hadoop數(shù)據(jù)分析與應(yīng)用MapReduce_第3頁
Hadoop數(shù)據(jù)分析與應(yīng)用MapReduce_第4頁
Hadoop數(shù)據(jù)分析與應(yīng)用MapReduce_第5頁
已閱讀5頁,還剩47頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

第3章MapReduce(一)Hadoop數(shù)據(jù)分析與應(yīng)用回顧Hadoop分布式模式配置與啟動步驟:配置Hadoop配置文件,包括core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml和slaves遠(yuǎn)程拷貝資源,將Master上的jdk文件夾、hadoop文件夾、hadoop-record文件夾以及環(huán)境變量hadoop-eco.sh遠(yuǎn)程拷貝到其他機(jī)器上關(guān)閉服務(wù)器防火墻格式化NameNode系統(tǒng)啟動HDFS啟動YARN通過網(wǎng)頁管理端訪問回顧HDFS的設(shè)計特點(diǎn):能夠存儲超大文件流式數(shù)據(jù)訪問商用硬件不能處理低時間延遲的數(shù)據(jù)訪問不能存放大量小文件無法高效實(shí)現(xiàn)多用戶寫入或者任意修改文件封裝MapReduce執(zhí)行流程MapReduce編寫與運(yùn)行Hadoop序列化與反序列化Hadoop分區(qū)使用Hadoop內(nèi)置jar文件進(jìn)行單詞計數(shù)統(tǒng)計編寫MapReduce實(shí)現(xiàn)氣象數(shù)據(jù)最高溫與最低溫統(tǒng)計使用Hadoop序列化與反序列化進(jìn)行用戶流量統(tǒng)計使用Hadoop分區(qū)根據(jù)用戶類型進(jìn)行分區(qū)統(tǒng)計定義屬性MapReduce的概念MapReduce的功能MapReduce鍵值對MapReduce執(zhí)行流程MapReduce程序運(yùn)行使用Hadoop內(nèi)置jar文件進(jìn)行單詞計數(shù)統(tǒng)計25203.1.1MapReduce的概念MapReduce是一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運(yùn)算。概念Map(映射)和Reduce(歸約)是它們的主要思想,都是從函數(shù)式和矢量編程語言里借來的特性,它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運(yùn)行在分布式系統(tǒng)上。HadoopMapReduce是一個易于編寫應(yīng)用程序的軟件框架,該應(yīng)用程序以可靠、容錯的方式并行處理大量硬件(數(shù)千個節(jié)點(diǎn))上的大量數(shù)據(jù)(多兆字節(jié)數(shù)據(jù)集)。3.1.2MapReduce的功能MapReduce提供了以下主要功能:數(shù)據(jù)拆分和計算任務(wù)調(diào)度數(shù)據(jù)與代碼互定位系統(tǒng)優(yōu)化出錯檢測和恢復(fù)3.1.3MapReduce鍵值對MapReduce的<key,value>存儲模型能夠存儲任意格式的數(shù)據(jù),Map()方法和reduce()方法可以進(jìn)行各種復(fù)雜的數(shù)據(jù)處理,這也使得程序員的負(fù)擔(dān)加重,在對上層業(yè)務(wù)的開發(fā)效率上不如SQL簡單。MapReduce在速度上會占有一定優(yōu)勢,MapReduce是為非結(jié)構(gòu)化大數(shù)據(jù)的復(fù)雜處理而設(shè)計的,這些業(yè)務(wù)具有一次性處理的特點(diǎn);此外由于采取了全數(shù)據(jù)掃描的模式以及對中間結(jié)果逐步匯總的策略,使其在擁有良好擴(kuò)展能力和容錯能力的同時,也導(dǎo)致了較高的磁盤和網(wǎng)絡(luò)I/O的負(fù)載以及較高的數(shù)據(jù)解析代價。3.1.4MapReduce執(zhí)行流程MapReduce執(zhí)行流程是先執(zhí)行Map后執(zhí)行Reduce的過程3.1.4MapReduce執(zhí)行流程Map任務(wù)處理讀取HDFS中的文件,每一行解析成一個<k,v>,每一個鍵值對調(diào)用一次map()方法覆蓋map()方法,接收上一步產(chǎn)生的<k,v>進(jìn)行處理,轉(zhuǎn)換為新的<k,v>輸出對上一步輸出的<k,v>進(jìn)行分區(qū),并默認(rèn)分為一個區(qū)對不同分區(qū)中的數(shù)據(jù)按照k進(jìn)行排序和分組。分組指的是相同key的value放到一個集合中對分組后的數(shù)據(jù)進(jìn)行歸約(可選)3.1.4MapReduce執(zhí)行流程Reduce任務(wù)處理多個Map任務(wù)的輸出,按照不同的分區(qū),通過網(wǎng)絡(luò)Copy到不同的Reduce節(jié)點(diǎn)上對多個Map的輸出進(jìn)行合并和排序?qū)educe輸出的<k,v>寫到HDFS中3.1.5MapReduce程序運(yùn)行Hadoop的發(fā)布包中內(nèi)置了一個hadoop-mapreduce-examples-2.7.3.jar供用戶使用,該安裝包位于安裝目錄下的share/hadoop/mapreduce文件夾中,該jar包中有各種MR示例程序3.1.5MapReduce程序運(yùn)行以單詞計數(shù)為例,可以通過以下步驟運(yùn)行:啟動HDFS與YARN。上傳需要統(tǒng)計的文件到HDFS下的data目錄。在集群中的任意一臺服務(wù)器上啟動執(zhí)行程序。輸入:hadoopjarhadoop-mapreduce-example-2.7.3.jarwordcount/data/output3.1.6學(xué)生實(shí)踐練習(xí)上傳一個英文文檔,使用hadoop-mapreduce-example-2.7.3.jar對英文文檔進(jìn)行單詞計數(shù)的統(tǒng)計。253.1.6學(xué)生實(shí)踐練習(xí)上傳文件到HDFS系統(tǒng);使用“hadoopfs-put<localsrc>...<dst>”命令將文件上傳到HDFS。使用“hadoopjar”命令執(zhí)行Java程序;使用hadoop-mapreduce-example-2.7.3.jar這個jar包中的wordcount()方法進(jìn)行單詞計數(shù)。最后使用瀏覽器通過50070端口訪問站點(diǎn),并在文件系統(tǒng)中下載文件,查看統(tǒng)計結(jié)果。Hadoop分布式模式搭建新舊API的差異MapReduceJavaAPI的介紹編寫MapReduce程序運(yùn)行MapReduce程序Combiner的介紹編寫MapReduce實(shí)現(xiàn)氣象數(shù)據(jù)最高溫與最低溫統(tǒng)計25203.2.1新舊API的差異adoop在0.20.0版本中第一次使用新的API,部分早期的Hadoop0.20.0版本不支持使用舊的API,但在接下來的Hadoop1.x和Hadoop2.x版本中新舊API都可以使用3.2.1新舊API的差異新舊API的差異主要有以下9點(diǎn):新的API放在org.apache.hadoop.mapreduce包中,舊的API放在org.apache.hadoop.mapred中新API傾向于使用抽象類,而不是接口,更有利于擴(kuò)展新API充分使用上下文對象context,允許用戶能與MapReduce系統(tǒng)通信鍵值對記錄在這兩類API中都被推給了Mapper和Reducer,除此之外,新的API通過重寫run()方法,允許Mapper和Reducer控制數(shù)據(jù)執(zhí)行流程,允許數(shù)據(jù)按條處理或者分批處理。舊的API只在Map中允許。新的API中,作業(yè)控制由Job類實(shí)現(xiàn),而非舊API中的JobClient類,新的API中刪除了JobClient類新增的API實(shí)現(xiàn)了配置的統(tǒng)一輸出文件的命名方式略有不同新API中的用戶重載函數(shù)(java.lang.InterruptedException)被聲明為拋出異常,這意味著可以用代碼來實(shí)現(xiàn)中斷響應(yīng)。在新的API中,reduce()傳遞的值是java.lang.Iterable類型的,而非舊API中傳遞的java.lang.Iterator類型。這一改變使我們更容易通過Java的for-each循環(huán)結(jié)構(gòu)來迭代這些值。3.2.2MapReduceJavaAPI的介紹org.apache.hadoop.mapreduce包提供了Mapper和Reduce基類。在大多數(shù)情況下,MapReduce作業(yè)會使用以上兩個基類實(shí)現(xiàn)針對該作業(yè)的Mapper和Reduce的子類。Mapper類:Mappermaps將鍵值對輸入到一組中間的鍵值對。maps是將輸入記錄轉(zhuǎn)換為中間記錄的單個任務(wù),轉(zhuǎn)換后的中間記錄與輸入記錄的類型不一定是相同的。常見方法:map()、setup()、cleanup()、run()Reducer類:Reducerreduces是一組中間值,這些值通過key組成較小的單元。作業(yè)的reduces數(shù)由用戶通過Java.StUnNoTrimeTebug(int)方法設(shè)置。常見方法:reduce()、setup()、cleanup()、run()Driver類:負(fù)責(zé)與Hadoop框架通信,并指定運(yùn)行MapReduce作業(yè)所需的配置元素的驅(qū)動程序3.2.3編寫MapReduce程序?qū)崿F(xiàn)一個WordCount程序,并將WordCount程序打包發(fā)布到Hadoop集群中運(yùn)行。publicclassWCMapextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Mapper<LongWritable,Text,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{ String[]splited=value.toString().split(""); for(Stringstr:splited) { context.write(newText(str),newLongWritable(1)); }}}創(chuàng)建Mapper子類繼承Mapper基類3.2.3編寫MapReduce程序publicclassWCReduceextendsReducer<Text,LongWritable,Text,LongWritable>{@Overrideprotectedvoidreduce(Texttext,Iterable<LongWritable>iterable,Reducer<Text,LongWritable,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{ longtime=0; for(LongWritablelw:iterable) { time+=lw.get(); } context.write(text,newLongWritable(time));}}創(chuàng)建Reducer子類繼承Reducer基類3.2.3編寫MapReduce程序publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);job.setJarByClass(MapReduceDemo.class);job.setMapperClass(WCMap.class);job.setReducerClass(WCReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);PathinputPath=newPath("/data");PathoutputPath=newPath("/output");FileInputFormat.setInputPaths(job,inputPath);FileOutputFormat.setOutputPath(job,outputPath);booleanwaitForCompletion=job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}創(chuàng)建Driver對象并封裝MapReduce作業(yè)類的相關(guān)方法3.2.4運(yùn)行MapReduce程序程序在Hadoop平臺上運(yùn)行的步驟如下:導(dǎo)出jar文件執(zhí)行jar文件使用“put”命令上傳需要統(tǒng)計的文檔到HDFS中執(zhí)行jar文件的命令:hadoopjarwc.jarcom.purple_bull.bigdata.mr.MapReduceDemo3.2.5Combiner的介紹在MapReduce中,當(dāng)Map生成的數(shù)據(jù)過大時,帶寬就成了瓶頸,當(dāng)Map發(fā)送給Reduce時,對數(shù)據(jù)進(jìn)行一次本地合并來減少數(shù)據(jù)傳輸量以提高網(wǎng)絡(luò)I/O性能;Combiner最基本的作用是實(shí)現(xiàn)本地key的聚合,有“本地Reduce”之稱,本質(zhì)上就是一個Reducer3.2.5Combiner的介紹修改Driver類,在Mapper類和Reducer類定義之間加入下列內(nèi)容:job.setCombinerClass(WCReduce.class); 3.2.6學(xué)生實(shí)踐練習(xí)根據(jù)氣象信息,獲取全年的最高溫和最低溫信息253.2.6學(xué)生實(shí)踐練習(xí)上傳氣象數(shù)據(jù)到HDFS中根據(jù)氣象信息數(shù)據(jù)的相關(guān)特點(diǎn),編寫Mapper類根據(jù)Mapper輸出的信息編寫Reducer創(chuàng)建Driver對象并封裝MapReduce作業(yè)類的相關(guān)方法使用hadoopjar命令運(yùn)行程序,進(jìn)行最高溫和最低溫的統(tǒng)計通過50070端口進(jìn)入網(wǎng)頁,下載結(jié)果數(shù)據(jù)Hadoop序列化與反序列化Java序列化和Hadoop序列化簡介Writable的分析與使用WritableComparable的分析與使用使用Hadoop序列化與反序列化進(jìn)行用戶流量統(tǒng)計25203.3.1Java序列化和Hadoop序列化簡介Java序列化需要實(shí)現(xiàn)serializable接口,進(jìn)而實(shí)現(xiàn)序列化與反序列化,Hadoop序列化與反序列化需要實(shí)現(xiàn)Writable的接口Java序列化與反序列化優(yōu)點(diǎn):實(shí)現(xiàn)簡便,對于循環(huán)引用和重復(fù)引用的情況也能處理,允許一定程度上類成員的改變。支持加密和驗證缺點(diǎn):序列化后的對象占用空間過大,數(shù)據(jù)膨脹。反序列化會不斷創(chuàng)建新的對象。同一個類的對象的序列化結(jié)果只輸出一份元數(shù)據(jù),導(dǎo)致了文件不能分割。Hadoop序列化與反序列化排列緊湊:盡量減少帶寬,加快數(shù)據(jù)交換速度處理快速:進(jìn)程間通信需要大量的數(shù)據(jù)交互,使用大量的序列化機(jī)制,必須減少序列化和反序列的開支跨語言:可以支持不同語言間的數(shù)據(jù)交互可擴(kuò)展:當(dāng)系統(tǒng)協(xié)議升級,類定義發(fā)生變化時,序列化機(jī)制需要支持這些升級和變化3.3.2Writable的分析與使用Writable位于org.apache.hadoop.io包下,該接口有兩個方法,分別為write()方法和readFields()方法,接口定義如下:publicinterfaceWritable{voidwrite(DataOutputout)throwsIOException;voidreadFields(DataInputin)throwsIOException;}3.3.2Writable的分析與使用在開發(fā)過程中經(jīng)常涉及到復(fù)雜類型參數(shù)的傳遞,此時類的序列化和反序列化就需要實(shí)現(xiàn)Writable中的方法以全國高考成績?yōu)槔?,獲取文科和理科平均分最高的考生信息,數(shù)據(jù)結(jié)構(gòu)說明見表:姓名考號身份證號語文數(shù)學(xué)英語綜合文/理張*梅

030000000****34209842001082****01421321402800劉*曉022000324****26209842001082****01261029419813.3.2Writable的分析與使用publicclassScoreEntityimplementsWritable{privateStringuserName;privateStringuserCandidateNumber;privateStringuserIdCard;privateIntegeruserChinese;privateIntegeruserMathematical;privateIntegeruserEnglish;privateIntegeruserSynthesis;privatefloatuserAverageScore;privateIntegeruserIs;publicScoreEntity(){}//構(gòu)造方法省略…//接下頁}考生成績信息實(shí)體類3.3.2Writable的分析與使用publicclassScoreEntityimplementsWritable{//接上頁@Overridepublicvoidwrite(DataOutputout)throwsIOException{ out.writeUTF(userName); out.writeUTF(userCandidateNumber); out.writeUTF(userIdCard); out.writeInt(userChinese); out.writeInt(userMathematical); out.writeInt(userEnglish); out.writeInt(userSynthesis); out.writeFloat(userAverageScore); out.writeInt(userIs);}//接下頁}序列化方法3.3.2Writable的分析與使用publicclassScoreEntityimplementsWritable{//接上頁@OverridepublicvoidreadFields(DataInputin)throwsIOException{ userName=in.readUTF(); userCandidateNumber=in.readUTF(); userIdCard=in.readUTF(); userChinese=in.readInt(); userMathematical=in.readInt(); userEnglish=in.readInt(); userSynthesis=in.readInt(); userAverageScore=in.readFloat(); userIs=in.readInt();}//toString方法省略…//省略屬性的get和set方法}反序列化方法3.3.2Writable的分析與使用publicclassScoreEntityimplementsWritable{//接上頁@OverridepublicvoidreadFields(DataInputin)throwsIOException{ userName=in.readUTF(); userCandidateNumber=in.readUTF(); userIdCard=in.readUTF(); userChinese=in.readInt(); userMathematical=in.readInt(); userEnglish=in.readInt(); userSynthesis=in.readInt(); userAverageScore=in.readFloat(); userIs=in.readInt();}//toString方法省略…//省略屬性的get和set方法}反序列化方法3.3.2Writable的分析與使用publicclassScoreMapperextendsMapper<LongWritable,Text,Text,ScoreEntity>{protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{String[]fields=value.toString().split("\t");try{ ScoreEntityentity=newScoreEntity(); entity.setUserName(fields[0]); entity.setUserCandidateNumber(fields[1]); entity.setUserIdCard(fields[2]); entity.setUserChinese(Integer.parseInt(fields[3])); entity.setUserMathematical(Integer.parseInt(fields[4])); entity.setUserEnglish(Integer.parseInt(fields[5])); entity.setUserSynthesis(Integer.parseInt(fields[6])); entity.setUserIs(Integer.parseInt(fields[7])); entity.setUserAverageScore((entity.getUserChinese()+entity.getUserEnglish()+entity. getUserMathematical()+entity.getUserSynthesis())/4); if(Integer.parseInt(fields[7])==1){ context.write(newText("文科"),entity); }else{ context.write(newText("理科"),entity); }}catch(Exceptione){ CountercountPrint=context.getCounter("Map-Exception",e.toString()); countPrint.increment(1l); }}}創(chuàng)建Mapper子類繼承Mapper基類3.3.2Writable的分析與使用publicclassScoreReduceextendsReducer<Text,ScoreEntity,Text,ScoreEntity>{@Overrideprotectedvoidreduce(Texttext,Iterable<ScoreEntity>iterable,Contextcontext)throwsIOException,InterruptedException{try{ ScoreEntityentity=null; if(text.toString().equals("文科")){ for(ScoreEntityscoreEntity:iterable){ if(entity==null){ entity=scoreEntity; }else{ if(entity.getUserAverageScore()<scoreEntity.getUserAverageScore()){ entity=scoreEntity;}}} context.write(newText("文科"),entity); }else{ for(ScoreEntityscoreEntity:iterable){ if(entity==null){ entity=scoreEntity; }else{ if(entity.getUserAverageScore()<scoreEntity.getUserAverageScore()){ entity=scoreEntity;}}} context.write(newText("理科"),entity);}}catch(Exceptione){ CountercountPrint=context.getCounter("Reduce-OutValue",e.getMessage()); countPrint.increment(1l);}}}創(chuàng)建Reducer子類繼承Reducer基類3.3.2Writable的分析與使用publicclassScoreDemo{publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf); job.setJarByClass(ScoreDemo.class); job.setMapperClass(ScoreMapper.class); job.setReducerClass(ScoreReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ScoreEntity.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ScoreEntity.class); PathinputPath=newPath("/data"); PathoutputPath=newPath("/output"); outputPath.getFileSystem(conf).delete(outputPath,true); FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); booleanwaitForCompletion=job.waitForCompletion(true); System.exit(waitForCompletion?0:1);}}創(chuàng)建Driver對象并封裝MapReduce作業(yè)類的相關(guān)方法3.3.3WritableComparable的分析與使用在進(jìn)行MapReduce編程時,key鍵往往用于分組或排序,在進(jìn)行某些操作時,當(dāng)Hadoop內(nèi)置的key鍵數(shù)據(jù)類型無法滿足需求時,或當(dāng)它針對用例優(yōu)化自定義數(shù)據(jù)類型時,可以通過實(shí)現(xiàn)org.apache.hadoop.io.WritableComparable接口定義一個自定義的WritableComparable類型,并將其作為MapReduce計算模型的key鍵數(shù)據(jù)類型WritableComparable位于org.apache.hadoop.io包下,該接口有3個方法,分別為write()方法、readFields()方法和compareTo()方法。3.3.3WritableComparable的分析與使用以全國各中學(xué)學(xué)生們的語文、數(shù)學(xué)和英語的成績進(jìn)行匯總排序姓名語文數(shù)學(xué)英語XXX

142132140XX126102943.3.3WritableComparable的分析與使用privateStringuserName;privateIntegeruserChinese;privateIntegeruserMathematical;privateIntegeruserEnglish;privateIntegeruserTotalScore;@OverridepublicintcompareTo(ScoreSortEntityo){ returnthis.userTotalScore>o.userTotalScore?-1:1;}創(chuàng)建考生成績信息實(shí)體類,實(shí)現(xiàn)compareTo()方法3.3.3WritableComparable的分析與使用ScoreSortEntityentity=newScoreSortEntity( fields[0], Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]));context.write(entity,newText(fields[0]));創(chuàng)建Mapper子類繼承Mapper基類context.writ

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論