《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》課件實(shí)驗(yàn)指導(dǎo)書(shū)-實(shí)驗(yàn)3MapReduce編程_第1頁(yè)
《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》課件實(shí)驗(yàn)指導(dǎo)書(shū)-實(shí)驗(yàn)3MapReduce編程_第2頁(yè)
《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》課件實(shí)驗(yàn)指導(dǎo)書(shū)-實(shí)驗(yàn)3MapReduce編程_第3頁(yè)
《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》課件實(shí)驗(yàn)指導(dǎo)書(shū)-實(shí)驗(yàn)3MapReduce編程_第4頁(yè)
《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》課件實(shí)驗(yàn)指導(dǎo)書(shū)-實(shí)驗(yàn)3MapReduce編程_第5頁(yè)
已閱讀5頁(yè),還剩11頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

《大數(shù)據(jù)技術(shù)實(shí)戰(zhàn)案例教程》實(shí)驗(yàn)指導(dǎo)書(shū)實(shí)驗(yàn)3MapReduce編程編寫(xiě)者:徐魯輝實(shí)驗(yàn)3MapReduce編程3.1實(shí)驗(yàn)?zāi)康?.理解MapReduce編程思想。2.理解MapReduce作業(yè)執(zhí)行流程。3.理解MR-App編寫(xiě)步驟,掌握使用MapReduceJavaAPI進(jìn)行MapReduce基本編程,熟練掌握如何在Hadoop集群上運(yùn)行MR-App并查看運(yùn)行結(jié)果。4.熟練掌握MapReduceWebUI界面的使用。5.掌握MapReduceShell常用命令的使用。3.2實(shí)驗(yàn)環(huán)境本實(shí)驗(yàn)所需的軟件環(huán)境包括全分布模式Hadoop集群、Eclipse。3.3實(shí)驗(yàn)內(nèi)容1.啟動(dòng)全分布模式Hadoop集群,守護(hù)進(jìn)程包括NameNode、DataNode、SecondaryNameNode、ResourceManager、NodeManager和JobHistoryServer。2.在Eclipse下創(chuàng)建Java項(xiàng)目MapReduceExample,在其下建立新包c(diǎn)om.xijing.mapreduce,編寫(xiě)MapReduce程序,已知某個(gè)超市的結(jié)算記錄,從左往右各字段的含義依次是會(huì)員編號(hào)、結(jié)算時(shí)間、消費(fèi)金額和用戶身份,要求計(jì)算出會(huì)員和非會(huì)員的平均消費(fèi)金額。最后打包成JAR形式并在Hadoop集群上運(yùn)行該MR-App,查看運(yùn)行結(jié)果。3.分別在自編MapReduce程序運(yùn)行過(guò)程中和運(yùn)行結(jié)束后練習(xí)MapReduceShell常用命令。4.分別在自編MapReduce程序運(yùn)行過(guò)程中和運(yùn)行結(jié)束后查看MapReduceWebUI界面。5.關(guān)閉Hadoop集群。3.4實(shí)驗(yàn)原理3.4.1初識(shí)MapReduceMapReduce是一個(gè)可用于大規(guī)模數(shù)據(jù)處理分布式計(jì)算框架,HadoopMapReduce來(lái)源于Google公司2004年發(fā)表的一篇關(guān)于MapReduce的論文(MapReduce:SimplifiedDataProcessingonLargeClusters,2004),其對(duì)大數(shù)據(jù)并行處理采用“分而治之”的設(shè)計(jì)思想,并把函數(shù)式編程思想構(gòu)建成抽象模型Map和Reduce。MapReduce在發(fā)展史上經(jīng)過(guò)一次重大改變。舊版MapReduce(MapReduce1.0)采用典型的主從架構(gòu),主進(jìn)程為JobTracker,從進(jìn)程為T(mén)askTracker,但是這種架構(gòu)過(guò)于簡(jiǎn)單,導(dǎo)致JobTracker的任務(wù)過(guò)于集中,并且存在單點(diǎn)故障等問(wèn)題。因此,MapReduce進(jìn)行了一次重要升級(jí),舍棄了JobTracker和TaskTracker,而改用了ResourceManager進(jìn)程負(fù)責(zé)處理資源,并且使用ApplicationMaster進(jìn)程管理各個(gè)具體的應(yīng)用,用NodeManager進(jìn)程對(duì)各個(gè)節(jié)點(diǎn)的工作情況進(jìn)行監(jiān)聽(tīng),形成了純粹的計(jì)算框架MapReduce2.0和存粹的資源管理框架YARN。3.4.2MapReduce作業(yè)執(zhí)行流程MapReduce作業(yè)的執(zhí)行流程主要包括InputFormat、Map、Shuffle、Reduce、OutputFormat五個(gè)階段。InputFormat加載數(shù)據(jù)并轉(zhuǎn)換為適合Map任務(wù)讀取的鍵值對(duì)<key,valule>,輸入給Map任務(wù);Map階段會(huì)根據(jù)用戶自定義的映射規(guī)則,輸出一系列的<key,value>作為中間結(jié)果;Shuffle階段對(duì)Map的輸出進(jìn)行一定的排序、分區(qū)、合并、歸并等操作,得到<key,List(value)>形式的中間結(jié)果;Reduce階段以Shuffle輸出作為輸入,執(zhí)行用戶定義的邏輯,輸出<key,valule>形式的結(jié)果給OutputFormat;OutputFormat模塊完成輸出驗(yàn)證后將Reduce結(jié)果寫(xiě)入分布式文件系統(tǒng)。MapReduce作業(yè)執(zhí)行流程如圖3-2所示。最終結(jié)果最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>加載文件最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>寫(xiě)入文件分布式文件系統(tǒng)(如HDFS)InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)1加載文件寫(xiě)入文件InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)2分布式文件系統(tǒng)(如HDFS)圖3-2MapReduce作業(yè)執(zhí)行流程3.4.3MapReduce入門(mén)案例WordCountHadoop提供了一個(gè)MapReduce入門(mén)案例“WordCount”,用于統(tǒng)計(jì)輸入文件中每個(gè)單詞出現(xiàn)的次數(shù),其是最簡(jiǎn)單也是最能體現(xiàn)MapReduce思想的程序之一,被稱為MapReduce版的“HelloWorld”。本節(jié)主要從源碼和執(zhí)行流程兩個(gè)角度對(duì)MapReduce編程模型進(jìn)行詳細(xì)分析。該案例源碼保存在$HADOOP_HOME/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.9.2.jar的WordCount.java中,其源碼共分為T(mén)okenizerMapper類、IntSumReducer類和main()函數(shù)三個(gè)部分。1.TokenizerMapper類從類名可知,該類是Map階段的實(shí)現(xiàn)。并且能夠發(fā)現(xiàn),在MapReduce中Map階段的業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Mapper類。Mapper類的四個(gè)泛型分別表示輸入數(shù)據(jù)的key類型、value類型,輸出數(shù)據(jù)的key類型、value類型。以本次“WordCount”為例,每次Map階段需要處理的數(shù)據(jù)是文件中的一行數(shù)據(jù),而默認(rèn)情況下這一行數(shù)據(jù)的偏移量(通俗的說(shuō),下一行記錄開(kāi)始位置=上一行記錄的開(kāi)始位置+上一行字符串內(nèi)容的長(zhǎng)度,這個(gè)相對(duì)字節(jié)的變化就叫做字節(jié)偏移量)就是輸入數(shù)據(jù)的key類型(一般而言,偏移量是一個(gè)長(zhǎng)整型,也可以寫(xiě)成本例中使用的Object類型);輸入數(shù)據(jù)的value類型就是這行數(shù)據(jù)本身,因此是Text類型(即Hadoop中定義的字符串類型);輸出數(shù)據(jù)的key類型是每個(gè)單詞本身,因此也是Text類型;而輸出數(shù)據(jù)的value類型,就表示該單詞出現(xiàn)了一次,因此就是數(shù)字1,可以表示為IntWritable類型(即Hadoop中定義的整數(shù)類型)。TokenizerMapper類的源碼如下所示。importorg.apache.hadoop.mapreduce.Mapper;publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{privatefinalstaticIntWritableone=newIntWritable(1);privateTextword=newText();publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{StringTokenizeritr=newStringTokenizer(value.toString());while(itr.hasMoreTokens()){word.set(itr.nextToken());context.write(word,one);}}}從源碼可知,Mapper類提供了map()方法,用于編寫(xiě)Map階段具體的業(yè)務(wù)邏輯。map()方法的前兩個(gè)參數(shù)表示輸入數(shù)據(jù)的key類型和value類型(即與Mapper類前兩個(gè)參數(shù)的含義一致),而map()的第三個(gè)參數(shù)Context對(duì)象表示Map階段的上下文對(duì)象,可以用于將Map階段的產(chǎn)物輸出到下一個(gè)階段中。2.IntSumReducer類數(shù)據(jù)在經(jīng)過(guò)了Shuffle階段后,就會(huì)進(jìn)入Reduce階段。入門(mén)案例“WordCount”中的Reduce源碼如下所示。importorg.apache.hadoop.mapreduce.Reducer;publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{privateIntWritableresult=newIntWritable();publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{intsum=0;for(IntWritableval:values){sum+=val.get();}result.set(sum);context.write(key,result);}}從上述源碼可知,Reduce階段的業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Reducer類。Reducer類的四個(gè)泛型分別表示Reduce階段輸入數(shù)據(jù)的key類型、value類型,以及Reduce階段輸出數(shù)據(jù)的key類型、value類型。MapReduce的流程依次Map階段、Shuffle階段和Reduce階段,因此Shuffle階段的產(chǎn)物就是Reduce階段的輸入數(shù)據(jù)。也就是說(shuō),本例中的IntSumReducer就是對(duì)Shuffle的產(chǎn)物進(jìn)行了統(tǒng)計(jì),即計(jì)算出了Flink、Hadoop、HDFS、Hello、Spark、University、Xijing各個(gè)單詞出現(xiàn)的次數(shù)分別是1、1、1、6、1、2、2。IntSumReducer源碼中最后的context.write()方法表示將Reduce階段的產(chǎn)物輸出到最終的HDFS中進(jìn)行存儲(chǔ),而存儲(chǔ)在HDFS中的具體位置是在main()方法中進(jìn)行設(shè)置的。3.main()函數(shù)入門(mén)案例“WordCount”中main()函數(shù)的源碼如下所示。publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length<2){System.err.println("Usage:wordcount<in>[<in>...]<out>");System.exit(2);}Jobjob=Job.getInstance(conf,"wordcount");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for(inti=0;i<otherArgs.length-1;++i){FileInputFormat.addInputPath(job,newPath(otherArgs[i]));}FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1]));System.exit(job.waitForCompletion(true)?0:1);}從上述源碼可知,main()的輸入?yún)?shù)決定了輸入數(shù)據(jù)的文件位置,以及輸出數(shù)據(jù)存儲(chǔ)到HDFS中的位置。并且main()方法設(shè)置了Map階段和Reduce階段的類文件,以及通過(guò)setOutputKeyClass()和setOutputValueClass()指定了最終輸出數(shù)據(jù)的類型。4.向Hadoop集群提交并運(yùn)行WordCount使用如下命令向Hadoop集群提交并運(yùn)行WordCount,執(zhí)行該命令前要求啟動(dòng)Hadoop集群。[xuluhui@master~]$hadoopjar/usr/local/hadoop-2.9.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jarwordcount/InputData/OutputData上述命令中,/InputData表示輸入目錄,/OutputData表示輸出目錄。執(zhí)行該命令前,要求HDFS輸入目錄/InputData已存在,且HDFS輸出目錄/OutputData不存在,否則會(huì)拋出異常。部分執(zhí)行過(guò)程如圖3-1所示。圖3-1向Hadoop集群提交并運(yùn)行WordCount的執(zhí)行過(guò)程(部分)上述程序執(zhí)行完畢后,會(huì)將結(jié)果輸出到HDFS目錄/OutputData中,使用命令“hadoopfs-ls/OutputData”來(lái)查看,使用的HDFSShell命令及具體過(guò)程如圖3-2所示。圖3-2中/OutputData目錄下有2個(gè)文件,其中/OutputData/_SUCCESS表示Hadoop程序已執(zhí)行成功,這個(gè)文件大小為0,文件名就告知了Hadoop程序的執(zhí)行狀態(tài);第二個(gè)文件/OutputData/part-r-00000才是Hadoop程序的運(yùn)行結(jié)果。在命令終端利用命令“hadoopfs-cat/OutputData/part-r-00000”查看Hadoop程序的運(yùn)行結(jié)果,使用的HDFSShell命令及單詞計(jì)數(shù)結(jié)果如圖3-2所示。圖3-2查看WordCount運(yùn)行結(jié)果3.4.4MapReduce數(shù)據(jù)類型MapReduce是集群運(yùn)算,因此要求在網(wǎng)絡(luò)中傳輸?shù)臄?shù)據(jù)必須是可序列化的類型,且為了良好匹配MapReduce內(nèi)部運(yùn)行機(jī)制,MapReduce專門(mén)設(shè)計(jì)了一套數(shù)據(jù)類型。例如MapReduce使用IntWritable定義整型變量,而不是Java內(nèi)置的int類型。MapReduce中常見(jiàn)的數(shù)據(jù)類型如表3-1所示。表3-1MapReduce中常見(jiàn)數(shù)據(jù)類型數(shù)據(jù)類型說(shuō)明IntWritable整型類型LongWritable長(zhǎng)整型類型FloatWritable單精度浮點(diǎn)數(shù)類型DoubleWritable雙精度浮點(diǎn)數(shù)類型ByteWritable字節(jié)類型BooleanWritable布爾類型TextUTF-8格式存儲(chǔ)的文本類型NullWritable空對(duì)象需要注意的是,這些數(shù)據(jù)類型的定義類都實(shí)現(xiàn)了WritableComparable接口。3.4.5MapReduce接口1.MapReduceWebUIMapReduceWebUI接口面向管理員??梢栽陧?yè)面上看到已經(jīng)完成的所有MR-App執(zhí)行過(guò)程中的統(tǒng)計(jì)信息,該頁(yè)面只支持讀,不支持寫(xiě)。MapReduceWebUI的默認(rèn)地址為http://JobHistoryServerIP:19888,可以查看MapReduce作業(yè)的歷史運(yùn)行情況,如圖3-3所示。圖3-3MapReduce作業(yè)歷史情況2.MapReduceShellMapReduceShell接口面向MapReduce程序員。程序員通過(guò)Shell接口能夠向YARN集群提交MR-App,查看正在運(yùn)行的MR-App,甚至可以終止正在運(yùn)行的MR-App。MapReduceShell命令統(tǒng)一入口為“mapred”,語(yǔ)法格式如下:mapred[--configconfdir][--loglevelloglevel]COMMAND讀者需要注意的是,若$HADOOP_HOME/bin未加入到系統(tǒng)環(huán)境變量PATH中,則需要切換到Hadoop安裝目錄下,輸入“bin/mapred”。讀者可以使用“mapred-help”查看其幫助,命令“mapred”的具體用法和參數(shù)說(shuō)明如圖3-4所示。圖3-4命令“mapred”用法MapReduceShell命令分為用戶命令和管理員命令。關(guān)于MapReduceShell命令的完整說(shuō)明,讀者請(qǐng)參考官方網(wǎng)站/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html。3.MapReduceJavaAPIMapReduceJavaAPI接口面向Java開(kāi)發(fā)工程師。程序員可以通過(guò)該接口編寫(xiě)MR-App用戶層代碼MRApplicationBusinessLogic。MapReduce2.0應(yīng)用程序包含3部分:MRv2框架中的MRAppMaster、MRClient再加上用戶編寫(xiě)的MRApplicationBusinessLogic(Mapper類和Reduce類)。MR-App編寫(xiě)步驟如下所示:(1)編寫(xiě)MRApplicationBusinessLogic。自行編寫(xiě)。(2)編寫(xiě)MRApplicationMaster。無(wú)須編寫(xiě),Hadoop開(kāi)發(fā)人員已編寫(xiě)好MRAppMaster.java。(3)編寫(xiě)MRApplicationClient。無(wú)須編寫(xiě),Hadoop開(kāi)發(fā)人員已編寫(xiě)好YARNRunner.java。其中,MRApplicationBusinessLogic編寫(xiě)步驟如下:(1)確定<key,value>對(duì)。(2)定制輸入格式。(3)Mapper階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Mapper類。(4)Reducer階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Reducer類。(5)定制輸出格式。關(guān)于MapReduceAPI的完整說(shuō)明,讀者請(qǐng)參考官方網(wǎng)站/docs/r2.9.2/api/index.html。3.5實(shí)驗(yàn)步驟前文所講述的MapReduce入門(mén)案例WordCount統(tǒng)計(jì)的是單詞數(shù)量,而單詞本身屬于字面值,是比較容易計(jì)算的。本案例將會(huì)講解如何使用MapReduce統(tǒng)計(jì)對(duì)象中的某些屬性?!景咐?-1】以下是某個(gè)超市的結(jié)算記錄,從左往右各字段的含義依次是會(huì)員編號(hào)、結(jié)算時(shí)間、消費(fèi)金額和用戶身份,樣例數(shù)據(jù)如下所示:242315 2019-10-15.18:20:10 32 會(huì)員984518 2019-10-15.18:21:02 167 會(huì)員226335 2019-10-15.18:21:54 233 非會(huì)員341665 2019-10-15.18:22:11 5 非會(huì)員273367 2019-10-15.18:23:07 361 非會(huì)員296223 2019-10-15.18:25:12 19 會(huì)員193363 2019-10-15.18:25:55 268 會(huì)員671512 2019-10-15.18:26:04 76 非會(huì)員596233 2019-10-15.18:27:42 82 非會(huì)員323444 2019-10-15.18:28:02 219 會(huì)員345672 2019-10-15.18:28:48 482 會(huì)員...現(xiàn)在需要完成以下任務(wù):(1)編寫(xiě)MapReduce程序計(jì)算會(huì)員和非會(huì)員的平均消費(fèi)金額,并打包成jar文件提交Hadoop集群運(yùn)行,查看運(yùn)行結(jié)果。(2)在該MR-App運(yùn)行過(guò)程中及運(yùn)行結(jié)束后使用相關(guān)MapReduceShell命令,查看MR-App的運(yùn)行狀態(tài)等信息。(3)在該MR-App運(yùn)行過(guò)程中及運(yùn)行結(jié)束后查看MapReduceWebUI界面,查看MR-App的運(yùn)行狀態(tài)等信息。接下來(lái)從6個(gè)步驟完整闡述本案例的實(shí)現(xiàn)過(guò)程。3.5.1啟動(dòng)Hadoop集群在主節(jié)點(diǎn)上依次執(zhí)行以下3條命令啟動(dòng)全分布模式Hadoop集群。start-dfs.shstart-yarn.shmr-jobhistory-daemon.shstarthistoryserver“start-dfs.sh”命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)NameNode和SecondaryNameNode服務(wù),會(huì)在從節(jié)點(diǎn)上啟動(dòng)DataNode服務(wù);“start-yarn.sh”命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)ResourceManager服務(wù),會(huì)在從節(jié)點(diǎn)上啟動(dòng)NodeManager服務(wù);“mr-jobhistory-daemon.shstarthistoryserver”命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)JobHistoryServer服務(wù)。3.5.2編寫(xiě)并運(yùn)行MapReduce程序在Hadoop集群主節(jié)點(diǎn)上搭建MapReduce開(kāi)發(fā)環(huán)境Eclipse,具體過(guò)程請(qǐng)讀者參考本書(shū)2.6.4節(jié),此處不再贅述。與2.6.5使用HDFSJavaAPI編程過(guò)程相同,在Eclipse項(xiàng)目MapReduceExample下建立新包c(diǎn)om.xijing.mapreduce,模仿入門(mén)案例WordCount,編寫(xiě)一個(gè)WordCount程序,最后打包成JAR形式并在Hadoop集群上運(yùn)行該MR-App,查看運(yùn)行結(jié)果。具體過(guò)程如下所示。1.在Eclipse中創(chuàng)建Java項(xiàng)目進(jìn)入/usr/local/eclipse中通過(guò)可視化桌面打開(kāi)EclipseIDE,默認(rèn)的工作空間為“/home/xuluhui/eclipse-workspace”。選擇菜單『File』→『New』→『JavaProject』,創(chuàng)建Java項(xiàng)目“MapReduceExample”。2.在項(xiàng)目中導(dǎo)入所需JAR包為了編寫(xiě)關(guān)于MapReduce應(yīng)用程序,需要向Java項(xiàng)目中添加以下2個(gè)JAR包:hadoop-mapreduce-client-core-2.9.2.jar,MapReduce核心包,該包中包含了可以訪問(wèn)MapReduce的JavaAPI,位于$HADOOP_HOME/share/hadoop/mapreduce下。hadoop-common-2.9.2.jar,由于需要對(duì)HDFS文件進(jìn)行操作,所以還需要導(dǎo)入該包,該包位于$HADOOP_HOME/share/hadoop/common下。若不導(dǎo)入以上兩個(gè)JAR包,代碼將會(huì)出現(xiàn)錯(cuò)誤。讀者可以參考2.6.5節(jié)步驟添加該應(yīng)用程序編寫(xiě)時(shí)所需的JAR包。3.在項(xiàng)目中新建包右鍵單擊項(xiàng)目“MapReduceExample”,從彈出的快捷菜單中選擇『New』→『Package』,創(chuàng)建包“com.xijing.mapreduce”。4.使用Java語(yǔ)言編寫(xiě)MapReduce程序(1)編寫(xiě)實(shí)體類右鍵單擊Java項(xiàng)目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,編寫(xiě)封裝每個(gè)消費(fèi)者記錄的實(shí)體類Customer,每個(gè)消費(fèi)者至少包含了編號(hào)、消費(fèi)金額和是否為會(huì)員等屬性。Customer.java源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;publicclassCustomerimplementsWritable{ //會(huì)員編號(hào) privateStringid; //消費(fèi)金額 privateintmoney; //0:非會(huì)員1:會(huì)員 privateintvip; publicCustomer(){ } publicCustomer(Stringid,intmoney,intvip){ this.id=id; this.money=money; this.vip=vip; } publicintgetMoney(){ returnmoney; } publicvoidsetMoney(intmoney){ this.money=money; } publicStringgetId(){ returnid; } publicvoidsetId(Stringid){ this.id=id; } publicintgetVip(){ returnvip; } publicvoidsetVip(intvip){ this.vip=vip; } //序列化 publicvoidwrite(DataOutputdataOutput)throwsIOException{ dataOutput.writeUTF(id); dataOutput.writeInt(money); dataOutput.writeInt(vip); } //反序列化(注意:各屬性的順序要和序列化保持一致) publicvoidreadFields(DataInputdataInput)throwsIOException{ this.id=dataInput.readUTF(); this.money=dataInput.readInt(); this.vip=dataInput.readInt(); } @Override publicStringtoString(){ returnthis.id+"\t"+this.money+"\t"+this.vip; }}由于本次統(tǒng)計(jì)的Customer對(duì)象需要在Hadoop集群中的多個(gè)節(jié)點(diǎn)之間傳遞數(shù)據(jù),因此需要將Customer對(duì)象通過(guò)write(DataOutputdataOutput)方法進(jìn)行序列化操作,并通過(guò)readFields(DataInputdataInput)進(jìn)行反序列化操作。(2)編寫(xiě)Mapper類右鍵單擊Java項(xiàng)目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,編寫(xiě)Mapper類CustomerMapper。在Map階段讀取文本中的消費(fèi)者記錄信息,并將消費(fèi)者的各個(gè)屬性字段拆分讀取,然后根據(jù)會(huì)員情況,將消費(fèi)者的消費(fèi)金額輸出到MapReduce的下一個(gè)處理階段(即Shuffle),本案例Map階段輸出的數(shù)據(jù)形式是“會(huì)員(或非會(huì)員),消費(fèi)金額”。CustomerMapper.java源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassCustomerMapperextendsMapper<LongWritable,Text,Text,IntWritable>{ @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ //將一行內(nèi)容轉(zhuǎn)成string Stringline=value.toString(); //獲取各個(gè)顧客的消費(fèi)數(shù)據(jù) String[]fields=line.split("\t"); //獲取消費(fèi)金額 intmoney=Integer.parseInt(fields[2]); //獲取會(huì)員情況 Stringvip=fields[3]; /* 輸出 Key:會(huì)員情況,value:消費(fèi)金額 例如: 會(huì)員32 會(huì)員167 非會(huì)員233 非會(huì)員5 */ context.write(newText(vip),newIntWritable(money)); }}(3)編寫(xiě)Reducer類右鍵單擊Java項(xiàng)目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,編寫(xiě)Reducer類CustomerReducer。Map階段的輸出數(shù)據(jù)在經(jīng)過(guò)shuffle階段混洗以后,就會(huì)傳遞給Reduce階段,本案例Reduce階段輸入的數(shù)據(jù)形式是“會(huì)員(或非會(huì)員),[消費(fèi)金額1,消費(fèi)金額2,消費(fèi)金額3,...]”。因此,與WordCount類似,只需要在Reduce階段累加會(huì)員或非會(huì)員的總消費(fèi)金額就能完成本次任務(wù)。CustomerReducer.java源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassCustomerReducerextendsReducer<Text,IntWritable,Text,LongWritable>{ @Override protectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ //統(tǒng)計(jì)會(huì)員(或非會(huì)員)的個(gè)數(shù) intvipCount=0; //總消費(fèi)金額 longsumMoney=0; for(IntWritablemoney:values){ vipCount++; sumMoney+=money.get(); } //會(huì)員(或非會(huì)員)的平均消費(fèi)金額 longavgMoney=sumMoney/vipCount; context.write(key,newLongWritable(avgMoney)); }}(4)編寫(xiě)MapReduce程序驅(qū)動(dòng)類右鍵單擊Java項(xiàng)目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,編寫(xiě)MapReduce程序驅(qū)動(dòng)類CustomerDriver。在編寫(xiě)MapReduce程序時(shí),程序的驅(qū)動(dòng)類基本是相同的,因此,可以仿照之前的驅(qū)動(dòng)類,編寫(xiě)本次的MapReduce驅(qū)動(dòng)類。CustomerDriver.java源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassCustomerDriver{ publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); Jobjob=Job.getInstance(conf); job.setJarByClass(CustomerDriver.class); job.setMapperClass(CustomerMapper.class); job.setReducerClass(CustomerReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,newPath(args[0])); FileOutputFormat.setOutputPath(job,newPath(args[1])); booleanresult=job.waitForCompletion(true); System.exit(result?0:1); }}最后,使用與WordCount程序相同的方法,將本程序打包成JAR包后就可以提交到Hadoop集群中運(yùn)行了。執(zhí)行結(jié)果就是會(huì)員與非會(huì)員的平均消費(fèi)金額。5.將MapReduce程序打包成JAR包與實(shí)驗(yàn)2相同,為了運(yùn)行寫(xiě)好的MapReduce程序,需要首先將程序打包成JAR包,具體打包成JAR包的過(guò)程此處不再贅述。編者將其保存在/home/xuluhui/eclipse-workspace/MapReduceExample下,命名為Customer.jar。該Java項(xiàng)目當(dāng)前整體架構(gòu)效果如圖3-5所示。圖3

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論