09-離線計算系統(tǒng)-第9天(mapreduce加強(qiáng))_第1頁
09-離線計算系統(tǒng)-第9天(mapreduce加強(qiáng))_第2頁
09-離線計算系統(tǒng)-第9天(mapreduce加強(qiáng))_第3頁
09-離線計算系統(tǒng)-第9天(mapreduce加強(qiáng))_第4頁
09-離線計算系統(tǒng)-第9天(mapreduce加強(qiáng))_第5頁
已閱讀5頁,還剩36頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

課程大綱(MAPREDUCE詳解)Mapreudce程序運行演示Mapreduce編程規(guī)范及示例編寫Mapreduce模式及debug方法Mapreduce程序的核心機(jī)制MapReduce的序列化框架MapReduce的排序?qū)崿F(xiàn)MapReduce的分區(qū)機(jī)制及自定義Mapreduce的數(shù)據(jù)壓縮Mapreduce編程案例Mapreduce參數(shù)優(yōu)化MapReduce快速入門MapReduce高級特性掌握mapreduce分布式運算框架的編程思想mapreduce編程套路掌握mapreduce分布式運算框架的運行機(jī)制,具備一定自定義開發(fā)的能力流量統(tǒng)計相關(guān)需求1、對流量日志中的用戶統(tǒng)計總上、下行流量///序列化,將對象的字段信息寫入輸出流@Overridepublicvoidwrite(DataOutputout)throwsIOException{out.writeLong(upflow);out.writeLong(downflow);out.writeLong(sumflow);}//反序列化,從輸入流中讀取各個字段信息@OverridepublicvoidreadFields(DataInputin)throwsIOException{upflow=in.readLong();downflow=in.readLong();sumflow=in.readLong();}2、統(tǒng)計流量且按照流量大小倒序排序技術(shù)點:這種需求,用一個mapreduce-job不好實現(xiàn),需要兩個mapreduce-job第一個job負(fù)責(zé)流量統(tǒng)計,跟上題相同要實現(xiàn)其中的compareTo()方法,方法中,我們可以定義倒序比較的邏輯3、統(tǒng)計流量且按照手機(jī)號的歸屬地,將結(jié)果數(shù)據(jù)輸出到不同的省份文件中nsringpgetprefixmnullpartNum}}jobjob.setNumReduceTasks(5);reduceTask>=getPartition的結(jié)果數(shù),則會多產(chǎn)生幾個空的輸出文件part-r-000xxnreduceTask,最終也就只會產(chǎn)生一個結(jié)果文件part-r-00000社交粉絲數(shù)據(jù)分析以下是qq的好友列表數(shù)據(jù),冒號前是一個用,冒號后是該用戶的所有好友(數(shù)據(jù)中的好友是單向的)EKEOMG:A,C,D,E,FCDEOJ:B,OO:A,H,I,J第一步第一步A:B,C,D,F,E,O輸出<B,A><C,A><D,A><F,A><E,A><O,A>在讀一行B:A,C,E,K輸出<A,B><C,B><E,B><K,B>拿到的數(shù)據(jù)比如<C,A><C,B><C,E><C,F><C,G>......<A-B,C><A-E,C><A-G,C><B-E,C><B-F,C>.....二步讀入一行<A-B,C>直接輸出<A-B,C>讀入數(shù)據(jù)<A-B,C><A-B,F><A-B,G>.......倒排索引建立需求:有大量的文本(文檔、網(wǎng)頁),需要建立搜索索引1.1需求無論hdfs還是mapreduce,對于小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應(yīng)解決方案1.2分析1.3實現(xiàn)本節(jié)實現(xiàn)的是上述第二種方式在輸出時使用SequenceFileOutPutFormat輸出合并文件publicpublicclassWholeFileInputFormatextendsFileInputFormat<NullWritable,BytesWritable>{//設(shè)置每個小文件不可分片,保證一個小文件生成一個key-value鍵值對@OverrideprotectedbooleanisSplitable(JobContextcontext,Pathfile){se}@OverridepublicRecordReader<NullWritable,BytesWritable>createRecordReader(InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException{WholeFileRecordReaderreader=newWholeFileRecordReader();splitcontexturnreader}}classWholeFileRecordReaderextendsRecordReader<NullWritable,BytesWritable>{privateFileSplitfileSplit;privateConfigurationconf;privateBytesWritablevalue=newBytesWritable();privatebooleanprocessed=false;@Overridepublicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException{this.fileSplit=(FileSplit)split;this.conf=context.getConfiguration();}@OverridepublicbooleannextKeyValue()throwsIOException,InterruptedException{if(!processed){byte[]contents=newbyte[(int)fileSplit.getLength()];Pathfile=fileSplit.getPath();FileSystemfs=file.getFileSystem(conf);FSDataInputStreamin=null;try{in=fs.open(file);IOUtils.readFully(in,contents,0,contents.length);value.set(contents,0,contents.length);}finally{IOUtils.closeStream(in);}processed=true;rntrue}se}@OverridepublicNullWritablegetCurrentKey()throwsIOException,InterruptedException{returnNullWritable.get();}@@OverridepublicBytesWritablegetCurrentValue()throwsIOException,InterruptedException{alue}@OverridepublicfloatgetProgress()throwsIOException{returnprocessed?1.0f:0.0f;}@Overridepublicvoidclose()throwsIOException{//donothing}}publicpublicclassSmallFilesToSequenceFileConverterextendsConfiguredimplementsstaticclassSequenceFileMapperextendsMapper<NullWritable,BytesWritable,Text,BytesWritable>{@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{InputSplitsplit=context.getInputSplit();Pathpath=((FileSplit)split).getPath();filenameKey=newText(path.toString());}@Overrideprotectedvoidmap(NullWritablekey,BytesWritablevalue,Contextcontext)throwsIOException,InterruptedException{writefilenameKeyvalue}}@Overridepublicintrun(String[]args)throwsException{Configurationconf=newConfiguration();System.setProperty("HADOOP_USER_NAME","hdfs");String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();if(otherArgs.length!=2){System.err.println("Usage:combinefiles<in><out>");Systemexit2);}Jobjob=Job.getInstance(conf,"combinesmallfilestosequencefile");//job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);returnjob.waitForCompletion(true)?0:1;}publicstaticvoidmain(String[]args)throwsException{intexitCode=ToolRunner.run(newSmallFilesToSequenceFileConverter(),sSystem.exit(exitCode);}}2.1需求現(xiàn)有一些原始日志需要做增強(qiáng)解析處理,流程:出到待爬清單目錄2.2分析程序的關(guān)鍵點是要在一個mapreduce程序中根據(jù)數(shù)據(jù)的不同輸出兩類結(jié)果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現(xiàn)2.3實現(xiàn)數(shù)據(jù)庫獲取數(shù)據(jù)的工具publicpublicclassDBLoader{publicstaticvoiddbLoader(HashMap<String,String>ruleMap){Connectionconn=null;Statementst=null;ResultSetres=null;try{Class.forName("com.mysql.jdbc.Driver");conn=DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge","root","root");st=conn.createStatement();res=st.executeQuery("selecturl,contentfromurlcontent");while(res.next()){ruleMap.put(res.getString(1),res.getString(2));}}catch(Exceptione){intStackTrace}finally{try{if(res!=null){esclose}ifst!=null){st.close();}if(conn!=null){conn.close();}}}catch(Exceptione){intStackTrace}}}publicstaticvoidmain(String[]args){DBLoaderdb=newDBLoader();HashMap<String,String>map=newHashMap<String,String>();db.dbLoader(map);System.out.println(map.size());}}publicclassLogEnhancerOutputFormatextendsFileOutputFormat<Text,NullWritable>{@OverridepublicRecordWriter<Text,NullWritable>getRecordWriter(TaskAttemptContextcontext)throwsIOException,InterruptedException{FileSystemfs=FileSystem.get(context.getConfiguration());PathenhancePath=newPath("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");PathtoCrawlPath=newPath("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");FSDataOutputStreamenhanceOut=fs.create(enhancePath);FSDataOutputStreamtoCrawlOut=fs.create(toCrawlPath);returnnewMyRecordWriter(enhanceOut,toCrawlOut);}staticclassMyRecordWriterextendsRecordWriter<Text,NullWritable>{FSDataOutputStreamenhanceOut=null;FSDataOutputStreamtoCrawlOut=null;publicMyRecordWriter(FSDataOutputStreamenhanceOut,FSDataOutputStreamtoCrawlOut){this.enhanceOut=enhanceOut;this.toCrawlOut=toCrawlOut;}@Overridepublicvoidwrite(Textkey,NullWritablevalue)throwsIOException,InterruptedException{//有了數(shù)據(jù),你來負(fù)責(zé)寫到目的地——hdfs//判斷,進(jìn)來內(nèi)容如果是帶tocrawl的,就往待爬清單輸出流中寫toCrawlOutkeytoStringcontainstocrawltoCrawlOut.write(key.toString().getBytes());enhanceOut.write(key.toString().getBytes());}}@Overridepublicvoidclose(TaskAttemptContextcontext)throwsIOException,InterruptedException{if(toCrawlOut!=null){toCrawlOut.close();}if(enhanceOut!=null){enhanceOut.close();}}}}/***這個程序是對每個小時不斷產(chǎn)生的用戶上網(wǎng)記錄日志進(jìn)行增強(qiáng)(將日志中的url所指向的網(wǎng)頁內(nèi)容分析結(jié)果信息追加到每一行原始日志后面)*author*publicclassLogEnhancer{staticclassLogEnhancerMapperextendsMapper<LongWritable,Text,Text,NullWritable>{HashMap<String,String>knowledgeMap=newHashMap<String,String>();/***maptask在初始化時會先調(diào)用setup方法一次利用這個機(jī)制,將外部的知識庫maptask機(jī)器內(nèi)存中@Overrideprotectedvoidsetup(org.apache.hadoop.mapreduce.Mapper.Contextcontext)throwsIOException,InterruptedException{DBLoader.dbLoader(knowledgeMap);}@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=value.toString();String[]fields=StringUtils.split(line,"\t");try{Stringurl=fields[26];//對這一行日志中的url去知識庫中查找內(nèi)容分析信息Stringcontent=knowledgeMap.get(url);//根據(jù)內(nèi)容信息匹配的結(jié)果,來構(gòu)造兩種輸出結(jié)果Stringresult="";if(null==content){//輸往待爬清單的內(nèi)容result=url+"\t"+"tocrawl\n";}else{//輸往增強(qiáng)日志的內(nèi)容result=line+"\t"+content+"\n";}context.write(newText(result),NullWritable.get());}catch(Exceptione){}}}publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);job.setJarByClass(LogEnhancer.class);job.setMapperClass(LogEnhancerMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//要將自定義的輸出格式組件設(shè)置到j(luò)ob中job.setOutputFormatClass(LogEnhancerOutputFormat.class);FileInputFormat.setInputPaths(job,newPath(args[0]));//雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat錄FileOutputFormat.setOutputPath(job,newPath(args[1]));job.waitForCompletion(true);Systemexit0);}}3.1需求有如下訂單數(shù)據(jù)Order_0000001Order_0000001Order_0000002Order_0000002Order_0000002Order_0000003成交金額4現(xiàn)在需要求出每一個訂單中成交金額最大的一筆交易3.2分析idecomparatoridkv最大值3.3實現(xiàn)/***@authorduanhaitao@*publicclassItemidGroupingComparatorextendsWritableComparator{protectedItemidGroupingComparator(){super(OrderBean.class,true);}@@Overridepublicintcompare(WritableComparablea,WritableComparableb){OrderBeanabean=(OrderBean)a;OrderBeanbbean=(OrderBean)b;itemidbean從而聚合為一組returnabean.getItemid().compareTo(bbean.getItemid());}}/**/***@authorduanhaitao@*publicclassOrderBeanimplementsWritableComparable<OrderBean>{privateDoubleWritableamount;publicOrderBean(){}publicOrderBean(Textitemid,DoubleWritableamount){set(itemid,amount);}publicvoidset(Textitemid,DoubleWritableamount){this.itemid=itemid;this.amount=amount;}publicTextgetItemidrnitemid}publicDoubleWritablegetAmount(){returnamount;}@OverridepublicpublicintcompareTo(OrderBeano){intcmp=pareTo(o.getItemid());if(cmp==0){cmp=-pareTo(o.getAmount());}rncmp}@Overridepublicvoidwrite(DataOutputout)throwsIOException{out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@OverridepublicvoidreadFields(DataInputin)throwsIOException{StringreadUTF=in.readUTF();doublereadDouble=in.readDouble();this.itemid=newText(readUTF);this.amount=newDoubleWritable(readDouble);}@OverridepublicStringtoString(){returnitemid.toString()+"\t"+amount.get();}}/**secondarysortitem大的記錄*@authorduanhaitao@*publicclassSecondarySort{staticclassSecondarySortMapperextendsMapper<LongWritable,Text,OrderBean,leOrderBeanbean=newOrderBean();@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=value.toString();String[]fields=StringUtils.split(line,"\t");bean.set(newText(fields[0]),newDoubleWritable(Double.parseDouble(fields[1])));context.write(bean,NullWritable.get());}}staticclassSecondarySortReducerextendsReducer<OrderBean,NullWritable,OrderBean,le<100176.5>,null....y//要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key@Overrideprotectedvoidreduce(OrderBeankey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{keyNullWritableget}}publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);jobjob.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));job.setGroupingComparatorClass(ItemidGroupingComparator.class);job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}4.1.1需求join場景在實際中非常常見,比如“訂單日志”join“產(chǎn)品信息”4.1.2分析--原理闡述情形;可以將小表分發(fā)到所有的map節(jié)點,這樣,map節(jié)點就可以在本地對自己所讀到的大表數(shù)可以大大提高join操作的并發(fā)度,加快處理速度--并用distributedcache機(jī)制將小表的數(shù)據(jù)分發(fā)到每一個maptask執(zhí)行節(jié)點,從而每一個maptask節(jié)點可以從本地加載到小表的數(shù)據(jù),進(jìn)而在本地即可實現(xiàn)join4.1.3實現(xiàn)publicclassTestDistributedCache{staticclassTestDistributedCacheMapperextendsMapperLongWritableTextText,Text>{FileReaderin=null;BufferedReaderreader=null;HashMap<String,String>b_tab=newHashMap<String,String>();Stringlocalpath=null;Stringuirpath=null;//是在map任務(wù)初始化的時候調(diào)用一次@Overrideprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{//通過這幾句代碼可以獲取到cachefile的本地絕對路徑,測試驗證用Path[]files=context.getLocalCacheFiles();localpath=files[0].toString();URI[]cacheFiles=context.getCacheFiles();//緩存文件的用法——直接用本地IO來讀取//這里讀的數(shù)據(jù)是maptask所在機(jī)器本地工作目錄中的一個小文件in=newFileReader("b.txt");reader=newBufferedReader(in);Stringline=null;while(null!=(line=reader.readLine())){String[]fields=line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//這里讀的是這個maptask所負(fù)責(zé)的那一個切片數(shù)據(jù)(在hdfs上)String[]fields=value.toString().split("\t");Stringa_itemid=fields[0];Stringa_amount=fields[1];Stringb_name=b_tab.get(a_itemid);//輸出結(jié)果100198.9banancontext.write(newText(a_itemid),newText(a_amount+"\t"+":"+localpath+"\t"+b_name));}}publicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//這里是我們正常的需要處理的數(shù)據(jù)所在路徑FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));job.setNumReduceTasks(0);//分發(fā)一個文件到task進(jìn)程的工作目錄job.addCacheFile(newURI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分發(fā)一個歸檔文件到task進(jìn)程的工作目錄//job.addArchiveToClassPath(archive);//job.addFileToClassPath(jarfile);job.waitForCompletion(true);}}5.1計數(shù)器應(yīng)用種需求可以借助mapreduce框架中提供的全局計數(shù)器來實現(xiàn)publicclassMultiOutputs{//通過枚舉形式定義自定義計數(shù)器enumMyCounter{MALFORORMED,NORMAL}staticclassCommaMapperextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{String[]words=value.toString().split(",");for(Stringword:words){context.write(newText(word),newLongWritable(1));}//對枚舉定義的自定義計數(shù)器加1context.getCounter(MyCounter.MALFORORMED).increment(1);//通過動態(tài)設(shè)置自定義計數(shù)器加1context.getCounter("counterGroupa","countera").increment(1);}}5.3Configuration對象高級應(yīng)用一個稍復(fù)雜點的處理邏輯往往需要多個mapreduce程序串聯(lián)處理,多job的串聯(lián)可以借助ceJobControlControlledJobControlledJobcJob1=newControlledJob(job1.getConfiguration());ControlledJobcJob2=newControlledJob(job2.getConfiguration());ControlledJobcJob3=newControlledJob(job3.getConfiguration());cJob1.setJob(job1);cJob2.setJob(job2);cJob3.setJob(job3);//設(shè)置作業(yè)依賴關(guān)系cJob2.addDependingJob(cJob1);cJob3.addDependingJob(cJob2);JobControljobControl=newJobControl("RecommendationJob");jobControl.addJob(cJob1);jobControl.addJob(cJob2);jobControl.addJob(cJob3);//新建一個線程來運行已加入JobControl中的作業(yè),開始進(jìn)程并等待結(jié)束ThreadjobControlThread=newThread(jobControl);jobControlThread.start();while(!jobControl.allFinished()){Thread.sleep(500);}jobControl.stop();nMapReduce重要配置參數(shù)11.1資源相關(guān)參數(shù)//以下參數(shù)是在用戶自己的mr應(yīng)用程序中配置就可以生效MapTask超過該值,則會被強(qiáng)制殺死。(2)mapreduce.reduce.memory.mb:一個ReduceTask可使用的資源上限(單位:MB),默認(rèn)為ReduceTask實際使用的資源量超過該值,則會被強(qiáng)制殺死。參數(shù),e.g.“-Xmx1024m-verbose:gc-Xloggc:/tmp/@taskid@.gc”(@taskid@會被Hadoop框架自動換為相應(yīng)的taskid),默認(rèn)值:“”(4)mapreduce.reduce.java.opts:ReduceTask的JVM參數(shù),你可以在此配置默認(rèn)的javaheapsize等參數(shù),e.g.“-Xmx1024m-verbose:gc-Xloggc:/tmp/@taskid@.gc”,默認(rèn)值:“”(5)mapreduce.map.cpu.vcores:每個Maptask可使用的最多cpucore數(shù)目,默認(rèn)值:1(6)mapreduce.reduce.cpu.vcores:每個Reducetask可使用的最多cpucore數(shù)目,默認(rèn)值:1//應(yīng)該在yarn啟動之前就配

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論