版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、第七章 MapReduce提綱7.1 概述概述7.2 MapReduce體系結(jié)構(gòu)體系結(jié)構(gòu)7.3 MapReduce工作流程工作流程7.4 實(shí)例分析:實(shí)例分析:WordCount7.5 MapReduce的具體應(yīng)用的具體應(yīng)用7.6 MapReduce編程實(shí)踐編程實(shí)踐7.1概述n7.1.1 分布式并行編程n7.1.2 MapReduce模型簡(jiǎn)介n7.1.3 Map和Reduce函數(shù)7.1.1 分布式并行編程“摩爾定律”, CPU性能大約每隔18個(gè)月翻一番從2005年開始摩爾定律逐漸失效 ,需要處理的數(shù)據(jù)量快速增加,人們開始借助于分布式并行編程來提高程序性能 分布式程序運(yùn)行在大規(guī)模計(jì)算機(jī)集群上,可以
2、并行執(zhí)行大規(guī)模數(shù)據(jù)處理任務(wù),從而獲得海量的計(jì)算能力谷歌公司最先提出了分布式并行編程模型MapReduce,Hadoop MapReduce是它的開源實(shí)現(xiàn),后者比前者使用門檻低很多 7.1.1 分布式并行編程問題:在MapReduce出現(xiàn)之前,已經(jīng)有像MPI這樣非常成熟的并行計(jì)算框架了,那么為什么Google還需要MapReduce?MapReduce相較于傳統(tǒng)的并行計(jì)算框架有什么優(yōu)勢(shì)?傳統(tǒng)并行計(jì)算框架傳統(tǒng)并行計(jì)算框架MapReduce集群架構(gòu)/容錯(cuò)性共享式(共享內(nèi)存/共享存儲(chǔ)),容錯(cuò)性差非共享式,容錯(cuò)性好硬件/價(jià)格/擴(kuò)展性刀片服務(wù)器、高速網(wǎng)、SAN,價(jià)格貴,擴(kuò)展性差普通PC機(jī),便宜,擴(kuò)展性好編
3、程/學(xué)習(xí)難度what-how,難what,簡(jiǎn)單適用場(chǎng)景實(shí)時(shí)、細(xì)粒度計(jì)算、計(jì)算密集型批處理、非實(shí)時(shí)、數(shù)據(jù)密集型7.1.2 MapReduce模型簡(jiǎn)介MapReduce將復(fù)雜的、運(yùn)行于大規(guī)模集群上的并行計(jì)算過程高度地抽象到了兩個(gè)函數(shù):Map和Reduce編程容易,不需要掌握分布式并行編程細(xì)節(jié),也可以很容易把自己的程序運(yùn)行在分布式系統(tǒng)上,完成海量數(shù)據(jù)的計(jì)算MapReduce采用“分而治之分而治之”策略,一個(gè)存儲(chǔ)在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會(huì)被切分成許多獨(dú)立的分片(split),這些分片可以被多個(gè)Map任務(wù)并行處理MapReduce設(shè)計(jì)的一個(gè)理念就是“計(jì)算向數(shù)據(jù)靠攏計(jì)算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)
4、向計(jì)算靠攏”,因?yàn)?,移?dòng)數(shù)據(jù)需要大量的網(wǎng)絡(luò)傳輸開銷MapReduce框架采用了Master/Slave架構(gòu),包括一個(gè)Master和若干個(gè)Slave。Master上運(yùn)行JobTracker,Slave上運(yùn)行TaskTracker Hadoop框架是用Java實(shí)現(xiàn)的,但是,MapReduce應(yīng)用程序則不一定要用Java來寫 7.1.3 Map和Reduce函數(shù)函數(shù)函數(shù)輸入輸入輸出輸出說明說明Map如:List()如:1.將小數(shù)據(jù)集進(jìn)一步解析成一批對(duì),輸入Map函數(shù)中進(jìn)行處理2.每一個(gè)輸入的會(huì)輸出一批。是計(jì)算的中間結(jié)果Reduce 如:“a”,輸入的中間結(jié)果中的List(v2)表示是一批屬于同一個(gè)k
5、2的value7.2 MapReduce的體系結(jié)構(gòu)MapReduce體系結(jié)構(gòu)主要由四個(gè)部分組成,分別是:Client、JobTracker、TaskTracker以及Task7.2 MapReduce的體系結(jié)構(gòu)MapReduce主要有以下4個(gè)部分組成:1)Client用戶編寫的MapReduce程序通過Client提交到JobTracker端用戶可通過Client提供的一些接口查看作業(yè)運(yùn)行狀態(tài)2)JobTrackerJobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度JobTracker 監(jiān)控所有TaskTracker與Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點(diǎn)JobTracker 會(huì)
6、跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí),選擇合適的任務(wù)去使用這些資源7.2 MapReduce的體系結(jié)構(gòu)3)TaskTrackerTaskTracker 會(huì)周期性地通過“心跳”將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報(bào)給JobTracker,同時(shí)接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動(dòng)新任務(wù)、殺死任務(wù)等)TaskTracker 使用“slot”等量劃分本節(jié)點(diǎn)上的資源量(CPU、內(nèi)存等)。一個(gè)Task 獲取到一個(gè)slot 后才有機(jī)會(huì)運(yùn)行,而Hadoop調(diào)度器的作用就是將各個(gè)TaskTra
7、cker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用4)TaskTask 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動(dòng)7.3MapReduce工作流程n7.3.1 工作流程概述n7.3.2 MapReduce各個(gè)執(zhí)行階段n7.3.3 Shuffle過程詳解7.3.1 工作流程概述圖7-1 MapReduce工作流程Shuffle7.3.1 工作流程概述不同的Map任務(wù)之間不會(huì)進(jìn)行通信不同的Reduce任務(wù)之間也不會(huì)發(fā)生任何信息交換用戶不能顯式地從
8、一臺(tái)機(jī)器向另一臺(tái)機(jī)器發(fā)送消息所有的數(shù)據(jù)交換都是通過MapReduce框架自身去實(shí)現(xiàn)的7.3.2 MapReduce各個(gè)執(zhí)行階段7.3.2 MapReduce各個(gè)執(zhí)行階段HDFS 以固定大小的block 為基本單位存儲(chǔ)數(shù)據(jù),而對(duì)于MapReduce 而言,其處理單位是split。split 是一個(gè)邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長(zhǎng)度、數(shù)據(jù)所在節(jié)點(diǎn)等。它的劃分方法完全由用戶自己決定。關(guān)于關(guān)于Split(分片)(分片)7.3.2 MapReduce各個(gè)執(zhí)行階段Reduce任務(wù)的數(shù)量任務(wù)的數(shù)量最優(yōu)的Reduce任務(wù)個(gè)數(shù)取決于集群中可用的reduce任務(wù)槽(slot)的數(shù)目通常設(shè)
9、置比reduce任務(wù)槽數(shù)目稍微小一些的Reduce任務(wù)個(gè)數(shù)(這樣可以預(yù)留一些系統(tǒng)資源處理可能發(fā)生的錯(cuò)誤)Map任務(wù)的數(shù)量任務(wù)的數(shù)量Hadoop為每個(gè)split創(chuàng)建一個(gè)Map任務(wù),split 的多少?zèng)Q定了Map任務(wù)的數(shù)目。大多數(shù)情況下,理想的分片大小是一個(gè)HDFS塊7.3.3 Shuffle過程詳解圖7-3 Shuffle過程 1. Shuffle過程簡(jiǎn)介過程簡(jiǎn)介7.3.3 Shuffle過程詳解2. Map端的端的Shuffle過程過程每個(gè)Map任務(wù)分配一個(gè)緩存MapReduce默認(rèn)100MB緩存設(shè)置溢寫比例0.8分區(qū)默認(rèn)采用哈希函數(shù)排序是默認(rèn)的操作排序后可以合并(Combine)合并不能改變
10、最終結(jié)果在Map任務(wù)全部結(jié)束之前進(jìn)行歸并歸并得到一個(gè)大的文件,放在本地磁盤文件歸并時(shí),如果溢寫文件數(shù)量大于預(yù)定值(默認(rèn)是3)則可以再次啟動(dòng)Combiner,少于3不需要JobTracker會(huì)一直監(jiān)測(cè)Map任務(wù)的執(zhí)行,并通知Reduce任務(wù)來領(lǐng)取數(shù)據(jù)合并(Combine)和歸并(Merge)的區(qū)別:兩個(gè)鍵值對(duì)和,如果合并,會(huì)得到,如果歸并,會(huì)得到“a”,7.3.3 Shuffle過程詳解3. Reduce端的端的Shuffle過程過程Reduce任務(wù)通過RPC向JobTracker詢問Map任務(wù)是否已經(jīng)完成,若完成,則領(lǐng)取數(shù)據(jù)Reduce領(lǐng)取數(shù)據(jù)先放入緩存,來自不同Map機(jī)器,先歸并,再合并,寫
11、入磁盤多個(gè)溢寫文件歸并成一個(gè)或多個(gè)大文件,文件中的鍵值對(duì)是排序的當(dāng)數(shù)據(jù)很少時(shí),不需要溢寫到磁盤,直接在緩存中歸并,然后輸出給Reduce7.3.3 Shuffle過程詳解3. Reduce端的端的Shuffle過程過程圖7-5 Reduce端的Shuffle過程 7.3.4 MapReduce應(yīng)用程序執(zhí)行過程7.4實(shí)例分析:WordCountn7.4.1 WordCount程序任務(wù)n7.4.2 WordCount設(shè)計(jì)思路n7.4.3 一個(gè)WordCount執(zhí)行過程的實(shí)例7.4.1 WordCount程序任務(wù)表7-2 WordCount程序任務(wù)程序WordCount輸入一個(gè)包含大量單詞的文本文件
12、輸出文件中每個(gè)單詞及其出現(xiàn)次數(shù)(頻數(shù)),并按照單詞字母順序排序,每個(gè)單詞和其頻數(shù)占一行,單詞和頻數(shù)之間有間隔表7-3 一個(gè)WordCount的輸入和輸出實(shí)例輸入輸出Hello WorldHello HadoopHello MapReduceHadoop 1Hello 3MapReduce 1World 17.4.2 WordCount設(shè)計(jì)思路n首先,需要檢查WordCount程序任務(wù)是否可以采用MapReduce來實(shí)現(xiàn)n其次,確定MapReduce程序的設(shè)計(jì)思路n最后,確定MapReduce程序的執(zhí)行過程7.4.3一個(gè)WordCount執(zhí)行過程的實(shí)例圖7-7 Map過程示意圖 7.4.3一個(gè)W
13、ordCount執(zhí)行過程的實(shí)例圖7-8 用戶沒有定義Combiner時(shí)的Reduce過程示意圖 7.4.3一個(gè)WordCount執(zhí)行過程的實(shí)例圖7-9 用戶有定義Combiner時(shí)的Reduce過程示意圖 7.5 MapReduce的具體應(yīng)用MapReduce可以很好地應(yīng)用于各種計(jì)算問題n 關(guān)系代數(shù)運(yùn)算(選擇、投影、并、交、差、連接)n 分組與聚合運(yùn)算n 矩陣-向量乘法n 矩陣乘法7.5 MapReduce的具體應(yīng)用用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接實(shí)現(xiàn)關(guān)系的自然連接7.5 MapReduce的具體應(yīng)用用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接實(shí)現(xiàn)關(guān)系的自然連接n 假設(shè)有關(guān)系R(A,B)和
14、S(B,C),對(duì)二者進(jìn)行自然連接操作n 使用Map過程,把來自R的每個(gè)元組轉(zhuǎn)換成一個(gè)鍵值對(duì)b, ,其中的鍵就是屬性B的值。把關(guān)系R包含到值中,這樣做使得我們可以在Reduce階段,只把那些來自R的元組和來自S的元組進(jìn)行匹配。類似地,使用Map過程,把來自S的每個(gè)元組,轉(zhuǎn)換成一個(gè)鍵值對(duì)b,n 所有具有相同B值的元組被發(fā)送到同一個(gè)Reduce進(jìn)程中,Reduce進(jìn)程的任務(wù)是,把來自關(guān)系R和S的、具有相同屬性B值的元組進(jìn)行合并n Reduce進(jìn)程的輸出則是連接后的元組,輸出被寫到一個(gè)單獨(dú)的輸出文件中7.5 MapReduce的具體應(yīng)用用用MapReduce實(shí)現(xiàn)關(guān)系的自然連接實(shí)現(xiàn)關(guān)系的自然連接7.6
15、MapReduce編程實(shí)踐n 7.6.1任務(wù)要求n 7.6.2編寫Map處理邏輯n 7.6.3編寫Reduce處理邏輯n 7.6.4 編寫main方法n 7.6.5 編譯打包代碼以及運(yùn)行程序n 7.6.6 Hadoop中執(zhí)行MapReduce任務(wù)的幾種方式7.6.1 任務(wù)要求文件文件A的內(nèi)容如下:的內(nèi)容如下:China is my motherlandI love China文件文件B的內(nèi)容如下:的內(nèi)容如下:I am from China期望結(jié)果期望結(jié)果如如右側(cè)所示右側(cè)所示:I 2is 1China 3my 1love 1am 1from 1motherland 17.6.2 編寫Map處理邏
16、輯Map輸入類型為期望的Map輸出類型為Map輸入類型最終確定為Map輸出類型最終確定為public static class MyMapper extends Mapper private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException StringTokenizer itr = ne
17、w StringTokenizer(value.toString(); while (itr.hasMoreTokens() word.set(itr.nextToken(); context.write(word,one); 7.6.3 編寫編寫ReduceReduce處理邏輯處理邏輯在Reduce處理數(shù)據(jù)之前,Map的結(jié)果首先通過Shuffle階段進(jìn)行整理Reduce階段的任務(wù):對(duì)輸入數(shù)字序列進(jìn)行求和Reduce的輸入數(shù)據(jù)為Reduce任務(wù)的輸入數(shù)據(jù):”I”,”China”, public static class MyReducer extends Reducer private Int
18、Writable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key,result); 7.6.3 編寫編寫ReduceReduce處理邏輯處理邏輯7.6.4 編寫main方法public sta
19、tic void main(String args) throws Exception Configuration conf = new Configuration(); /程序運(yùn)行時(shí)參數(shù) String otherArgs = new GenericOptionsParser(conf,args) .getRemainingArgs(); if (otherArgs.length != 2) System.err.println(Usage: wordcount ); System.exit(2); Job job = new Job(conf,word count); /設(shè)置環(huán)境參數(shù) job
20、.setJarByClass(WordCount.class); /設(shè)置整個(gè)程序的類名 job.setMapperClass(MyMapper.class); /添加MyMapper類 job.setReducerClass(MyReducer.class); /添加MyReducer類 job.setOutputKeyClass(Text.class); /設(shè)置輸出類型 job.setOutputValueClass(IntWritable.class); /設(shè)置輸出類型 FileInputFormat.addInputPath(job,new Path(otherArgs0); /設(shè)置輸入
21、文件 FileOutputFormat.setOutputPath(job,new Path(otherArgs1); /設(shè)置輸出文件 System.exit(job.waitForCompletion(true)?0:1); import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; impor
22、t org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.
23、hadoop.util.GenericOptionsParser;public class WordCount/WordCount類的具體代碼見下一頁(yè)完整代碼public class WordCount public static class MyMapper extends Mapper private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws
24、 IOException,InterruptedException StringTokenizer itr = new StringTokenizer(value.toString(); while (itr.hasMoreTokens() word.set(itr.nextToken(); context.write(word,one); public static class MyReducer extends Reducer private IntWritable result = new IntWritable(); public void reduce(Text key, Itera
25、ble values, Context context) throws IOException,InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key,result); public static void main(String args) throws Exception Configuration conf = new Configuration(); String otherArgs = new Generi
26、cOptionsParser(conf,args) .getRemainingArgs(); if (otherArgs.length != 2) System.err.println(Usage: wordcount ); System.exit(2); Job job = new Job(conf,word count); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Tex
27、t.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath( job,new Path(otherArgs0); FileOutputFormat.setOutputPath( job,new Path(otherArgs1); System.exit(job.waitForCompletion(true)?0:1); 7.6.5 編譯打包代碼以及運(yùn)行程序?qū)嶒?yàn)步驟實(shí)驗(yàn)步驟:使用java編譯程序,生成.class文件將.class文件打包為jar包運(yùn)行jar包(需要啟動(dòng)Hadoop)查看結(jié)果7.6.5 編譯打包代碼以及運(yùn)行程序Hadoop 2.x 版本中的依賴版本中的依賴 jarHadoop 2.x 版本中 jar 不再集中在一個(gè) hadoop-core*.jar 中,而是分成多個(gè) jar,如使用 Hadoop 2.6.0 運(yùn)行 WordCount 實(shí)例至少需要如下三個(gè) jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar$HADOOP_HOME/share/hadoop/mapreduce/hadoop
溫馨提示
- 1. 本站所有資源如無(wú)特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025標(biāo)準(zhǔn)版?zhèn)€人購(gòu)房合同書
- 2025合伙買車合同
- 2024-2025學(xué)年新教材高中生物 第二章 基因和染色體的關(guān)系 微專題四 伴性遺傳的解題方法說課稿 新人教版必修第二冊(cè)
- 預(yù)制樓板施工方案
- 肇慶鋼板樁支護(hù)施工方案
- 別墅電梯出售合同范例
- 2023九年級(jí)數(shù)學(xué)下冊(cè) 第二十九章 投影與視圖29.1 投影第2課時(shí) 正投影說課稿 (新版)新人教版001
- 2024年四年級(jí)英語(yǔ)上冊(cè) Unit 3 Let's Go Lesson 15 In the City說課稿 冀教版(三起)
- 自然補(bǔ)償管道施工方案
- 2024年四年級(jí)英語(yǔ)上冊(cè) Unit 1 My classroom The fifth period(第五課時(shí))說課稿 人教PEP
- 工程類工程公司介紹完整x
- 古籍文獻(xiàn)整理與研究
- 板帶生產(chǎn)工藝熱連軋帶鋼生產(chǎn)
- 關(guān)鍵工序特殊過程培訓(xùn)課件精
- 輪機(jī)備件的管理(船舶管理課件)
- 《機(jī)修工基礎(chǔ)培訓(xùn)》課件
- 統(tǒng)編《道德與法治》三年級(jí)下冊(cè)教材分析
- 國(guó)際尿失禁咨詢委員會(huì)尿失禁問卷表
- 國(guó)開行政管理論文行政組織的變革及其現(xiàn)實(shí)性研究
- 運(yùn)動(dòng)技能學(xué)習(xí)中的追加反饋
- 《淄博張店區(qū)停車問題治理現(xiàn)狀及優(yōu)化對(duì)策分析【開題報(bào)告+正文】15000字 》
評(píng)論
0/150
提交評(píng)論