大數(shù)據(jù)技術(shù)原理與應(yīng)用 第七章 MapReduce_第1頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用 第七章 MapReduce_第2頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用 第七章 MapReduce_第3頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用 第七章 MapReduce_第4頁
大數(shù)據(jù)技術(shù)原理與應(yīng)用 第七章 MapReduce_第5頁
已閱讀5頁,還剩40頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、第七章 MapReduce提綱7.1 概述概述7.2 MapReduce體系結(jié)構(gòu)體系結(jié)構(gòu)7.3 MapReduce工作流程工作流程7.4 實例分析:實例分析:WordCount7.5 MapReduce的具體應(yīng)用的具體應(yīng)用7.6 MapReduce編程實踐編程實踐7.1概述n7.1.1 分布式并行編程n7.1.2 MapReduce模型簡介n7.1.3 Map和Reduce函數(shù)7.1.1 分布式并行編程“摩爾定律”, CPU性能大約每隔18個月翻一番從2005年開始摩爾定律逐漸失效 ,需要處理的數(shù)據(jù)量快速增加,人們開始借助于分布式并行編程來提高程序性能 分布式程序運行在大規(guī)模計算機(jī)集群上,可以

2、并行執(zhí)行大規(guī)模數(shù)據(jù)處理任務(wù),從而獲得海量的計算能力谷歌公司最先提出了分布式并行編程模型MapReduce,Hadoop MapReduce是它的開源實現(xiàn),后者比前者使用門檻低很多 7.1.1 分布式并行編程問題:在MapReduce出現(xiàn)之前,已經(jīng)有像MPI這樣非常成熟的并行計算框架了,那么為什么Google還需要MapReduce?MapReduce相較于傳統(tǒng)的并行計算框架有什么優(yōu)勢?傳統(tǒng)并行計算框架傳統(tǒng)并行計算框架MapReduce集群架構(gòu)/容錯性共享式(共享內(nèi)存/共享存儲),容錯性差非共享式,容錯性好硬件/價格/擴(kuò)展性刀片服務(wù)器、高速網(wǎng)、SAN,價格貴,擴(kuò)展性差普通PC機(jī),便宜,擴(kuò)展性好編

3、程/學(xué)習(xí)難度what-how,難what,簡單適用場景實時、細(xì)粒度計算、計算密集型批處理、非實時、數(shù)據(jù)密集型7.1.2 MapReduce模型簡介MapReduce將復(fù)雜的、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce編程容易,不需要掌握分布式并行編程細(xì)節(jié),也可以很容易把自己的程序運行在分布式系統(tǒng)上,完成海量數(shù)據(jù)的計算MapReduce采用“分而治之分而治之”策略,一個存儲在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會被切分成許多獨立的分片(split),這些分片可以被多個Map任務(wù)并行處理MapReduce設(shè)計的一個理念就是“計算向數(shù)據(jù)靠攏計算向數(shù)據(jù)靠攏”,而不是“數(shù)據(jù)

4、向計算靠攏”,因為,移動數(shù)據(jù)需要大量的網(wǎng)絡(luò)傳輸開銷MapReduce框架采用了Master/Slave架構(gòu),包括一個Master和若干個Slave。Master上運行JobTracker,Slave上運行TaskTracker Hadoop框架是用Java實現(xiàn)的,但是,MapReduce應(yīng)用程序則不一定要用Java來寫 7.1.3 Map和Reduce函數(shù)函數(shù)函數(shù)輸入輸入輸出輸出說明說明Map如:List()如:1.將小數(shù)據(jù)集進(jìn)一步解析成一批對,輸入Map函數(shù)中進(jìn)行處理2.每一個輸入的會輸出一批。是計算的中間結(jié)果Reduce 如:“a”,輸入的中間結(jié)果中的List(v2)表示是一批屬于同一個k

5、2的value7.2 MapReduce的體系結(jié)構(gòu)MapReduce體系結(jié)構(gòu)主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task7.2 MapReduce的體系結(jié)構(gòu)MapReduce主要有以下4個部分組成:1)Client用戶編寫的MapReduce程序通過Client提交到JobTracker端用戶可通過Client提供的一些接口查看作業(yè)運行狀態(tài)2)JobTrackerJobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度JobTracker 監(jiān)控所有TaskTracker與Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點JobTracker 會

6、跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)去使用這些資源7.2 MapReduce的體系結(jié)構(gòu)3)TaskTrackerTaskTracker 會周期性地通過“心跳”將本節(jié)點上資源的使用情況和任務(wù)的運行進(jìn)度匯報給JobTracker,同時接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)TaskTracker 使用“slot”等量劃分本節(jié)點上的資源量(CPU、內(nèi)存等)。一個Task 獲取到一個slot 后才有機(jī)會運行,而Hadoop調(diào)度器的作用就是將各個TaskTra

7、cker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用4)TaskTask 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動7.3MapReduce工作流程n7.3.1 工作流程概述n7.3.2 MapReduce各個執(zhí)行階段n7.3.3 Shuffle過程詳解7.3.1 工作流程概述圖7-1 MapReduce工作流程Shuffle7.3.1 工作流程概述不同的Map任務(wù)之間不會進(jìn)行通信不同的Reduce任務(wù)之間也不會發(fā)生任何信息交換用戶不能顯式地從

8、一臺機(jī)器向另一臺機(jī)器發(fā)送消息所有的數(shù)據(jù)交換都是通過MapReduce框架自身去實現(xiàn)的7.3.2 MapReduce各個執(zhí)行階段7.3.2 MapReduce各個執(zhí)行階段HDFS 以固定大小的block 為基本單位存儲數(shù)據(jù),而對于MapReduce 而言,其處理單位是split。split 是一個邏輯概念,它只包含一些元數(shù)據(jù)信息,比如數(shù)據(jù)起始位置、數(shù)據(jù)長度、數(shù)據(jù)所在節(jié)點等。它的劃分方法完全由用戶自己決定。關(guān)于關(guān)于Split(分片)(分片)7.3.2 MapReduce各個執(zhí)行階段Reduce任務(wù)的數(shù)量任務(wù)的數(shù)量最優(yōu)的Reduce任務(wù)個數(shù)取決于集群中可用的reduce任務(wù)槽(slot)的數(shù)目通常設(shè)

9、置比reduce任務(wù)槽數(shù)目稍微小一些的Reduce任務(wù)個數(shù)(這樣可以預(yù)留一些系統(tǒng)資源處理可能發(fā)生的錯誤)Map任務(wù)的數(shù)量任務(wù)的數(shù)量Hadoop為每個split創(chuàng)建一個Map任務(wù),split 的多少決定了Map任務(wù)的數(shù)目。大多數(shù)情況下,理想的分片大小是一個HDFS塊7.3.3 Shuffle過程詳解圖7-3 Shuffle過程 1. Shuffle過程簡介過程簡介7.3.3 Shuffle過程詳解2. Map端的端的Shuffle過程過程每個Map任務(wù)分配一個緩存MapReduce默認(rèn)100MB緩存設(shè)置溢寫比例0.8分區(qū)默認(rèn)采用哈希函數(shù)排序是默認(rèn)的操作排序后可以合并(Combine)合并不能改變

10、最終結(jié)果在Map任務(wù)全部結(jié)束之前進(jìn)行歸并歸并得到一個大的文件,放在本地磁盤文件歸并時,如果溢寫文件數(shù)量大于預(yù)定值(默認(rèn)是3)則可以再次啟動Combiner,少于3不需要JobTracker會一直監(jiān)測Map任務(wù)的執(zhí)行,并通知Reduce任務(wù)來領(lǐng)取數(shù)據(jù)合并(Combine)和歸并(Merge)的區(qū)別:兩個鍵值對和,如果合并,會得到,如果歸并,會得到“a”,7.3.3 Shuffle過程詳解3. Reduce端的端的Shuffle過程過程Reduce任務(wù)通過RPC向JobTracker詢問Map任務(wù)是否已經(jīng)完成,若完成,則領(lǐng)取數(shù)據(jù)Reduce領(lǐng)取數(shù)據(jù)先放入緩存,來自不同Map機(jī)器,先歸并,再合并,寫

11、入磁盤多個溢寫文件歸并成一個或多個大文件,文件中的鍵值對是排序的當(dāng)數(shù)據(jù)很少時,不需要溢寫到磁盤,直接在緩存中歸并,然后輸出給Reduce7.3.3 Shuffle過程詳解3. Reduce端的端的Shuffle過程過程圖7-5 Reduce端的Shuffle過程 7.3.4 MapReduce應(yīng)用程序執(zhí)行過程7.4實例分析:WordCountn7.4.1 WordCount程序任務(wù)n7.4.2 WordCount設(shè)計思路n7.4.3 一個WordCount執(zhí)行過程的實例7.4.1 WordCount程序任務(wù)表7-2 WordCount程序任務(wù)程序WordCount輸入一個包含大量單詞的文本文件

12、輸出文件中每個單詞及其出現(xiàn)次數(shù)(頻數(shù)),并按照單詞字母順序排序,每個單詞和其頻數(shù)占一行,單詞和頻數(shù)之間有間隔表7-3 一個WordCount的輸入和輸出實例輸入輸出Hello WorldHello HadoopHello MapReduceHadoop 1Hello 3MapReduce 1World 17.4.2 WordCount設(shè)計思路n首先,需要檢查WordCount程序任務(wù)是否可以采用MapReduce來實現(xiàn)n其次,確定MapReduce程序的設(shè)計思路n最后,確定MapReduce程序的執(zhí)行過程7.4.3一個WordCount執(zhí)行過程的實例圖7-7 Map過程示意圖 7.4.3一個W

13、ordCount執(zhí)行過程的實例圖7-8 用戶沒有定義Combiner時的Reduce過程示意圖 7.4.3一個WordCount執(zhí)行過程的實例圖7-9 用戶有定義Combiner時的Reduce過程示意圖 7.5 MapReduce的具體應(yīng)用MapReduce可以很好地應(yīng)用于各種計算問題n 關(guān)系代數(shù)運算(選擇、投影、并、交、差、連接)n 分組與聚合運算n 矩陣-向量乘法n 矩陣乘法7.5 MapReduce的具體應(yīng)用用用MapReduce實現(xiàn)關(guān)系的自然連接實現(xiàn)關(guān)系的自然連接7.5 MapReduce的具體應(yīng)用用用MapReduce實現(xiàn)關(guān)系的自然連接實現(xiàn)關(guān)系的自然連接n 假設(shè)有關(guān)系R(A,B)和

14、S(B,C),對二者進(jìn)行自然連接操作n 使用Map過程,把來自R的每個元組轉(zhuǎn)換成一個鍵值對b, ,其中的鍵就是屬性B的值。把關(guān)系R包含到值中,這樣做使得我們可以在Reduce階段,只把那些來自R的元組和來自S的元組進(jìn)行匹配。類似地,使用Map過程,把來自S的每個元組,轉(zhuǎn)換成一個鍵值對b,n 所有具有相同B值的元組被發(fā)送到同一個Reduce進(jìn)程中,Reduce進(jìn)程的任務(wù)是,把來自關(guān)系R和S的、具有相同屬性B值的元組進(jìn)行合并n Reduce進(jìn)程的輸出則是連接后的元組,輸出被寫到一個單獨的輸出文件中7.5 MapReduce的具體應(yīng)用用用MapReduce實現(xiàn)關(guān)系的自然連接實現(xiàn)關(guān)系的自然連接7.6

15、MapReduce編程實踐n 7.6.1任務(wù)要求n 7.6.2編寫Map處理邏輯n 7.6.3編寫Reduce處理邏輯n 7.6.4 編寫main方法n 7.6.5 編譯打包代碼以及運行程序n 7.6.6 Hadoop中執(zhí)行MapReduce任務(wù)的幾種方式7.6.1 任務(wù)要求文件文件A的內(nèi)容如下:的內(nèi)容如下:China is my motherlandI love China文件文件B的內(nèi)容如下:的內(nèi)容如下:I am from China期望結(jié)果期望結(jié)果如如右側(cè)所示右側(cè)所示:I 2is 1China 3my 1love 1am 1from 1motherland 17.6.2 編寫Map處理邏

16、輯Map輸入類型為期望的Map輸出類型為Map輸入類型最終確定為Map輸出類型最終確定為public static class MyMapper extends Mapper private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException StringTokenizer itr = ne

17、w StringTokenizer(value.toString(); while (itr.hasMoreTokens() word.set(itr.nextToken(); context.write(word,one); 7.6.3 編寫編寫ReduceReduce處理邏輯處理邏輯在Reduce處理數(shù)據(jù)之前,Map的結(jié)果首先通過Shuffle階段進(jìn)行整理Reduce階段的任務(wù):對輸入數(shù)字序列進(jìn)行求和Reduce的輸入數(shù)據(jù)為Reduce任務(wù)的輸入數(shù)據(jù):”I”,”China”, public static class MyReducer extends Reducer private Int

18、Writable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key,result); 7.6.3 編寫編寫ReduceReduce處理邏輯處理邏輯7.6.4 編寫main方法public sta

19、tic void main(String args) throws Exception Configuration conf = new Configuration(); /程序運行時參數(shù) String otherArgs = new GenericOptionsParser(conf,args) .getRemainingArgs(); if (otherArgs.length != 2) System.err.println(Usage: wordcount ); System.exit(2); Job job = new Job(conf,word count); /設(shè)置環(huán)境參數(shù) job

20、.setJarByClass(WordCount.class); /設(shè)置整個程序的類名 job.setMapperClass(MyMapper.class); /添加MyMapper類 job.setReducerClass(MyReducer.class); /添加MyReducer類 job.setOutputKeyClass(Text.class); /設(shè)置輸出類型 job.setOutputValueClass(IntWritable.class); /設(shè)置輸出類型 FileInputFormat.addInputPath(job,new Path(otherArgs0); /設(shè)置輸入

21、文件 FileOutputFormat.setOutputPath(job,new Path(otherArgs1); /設(shè)置輸出文件 System.exit(job.waitForCompletion(true)?0:1); import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; impor

22、t org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.

23、hadoop.util.GenericOptionsParser;public class WordCount/WordCount類的具體代碼見下一頁完整代碼public class WordCount public static class MyMapper extends Mapper private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws

24、 IOException,InterruptedException StringTokenizer itr = new StringTokenizer(value.toString(); while (itr.hasMoreTokens() word.set(itr.nextToken(); context.write(word,one); public static class MyReducer extends Reducer private IntWritable result = new IntWritable(); public void reduce(Text key, Itera

25、ble values, Context context) throws IOException,InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key,result); public static void main(String args) throws Exception Configuration conf = new Configuration(); String otherArgs = new Generi

26、cOptionsParser(conf,args) .getRemainingArgs(); if (otherArgs.length != 2) System.err.println(Usage: wordcount ); System.exit(2); Job job = new Job(conf,word count); job.setJarByClass(WordCount.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Tex

27、t.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath( job,new Path(otherArgs0); FileOutputFormat.setOutputPath( job,new Path(otherArgs1); System.exit(job.waitForCompletion(true)?0:1); 7.6.5 編譯打包代碼以及運行程序?qū)嶒灢襟E實驗步驟:使用java編譯程序,生成.class文件將.class文件打包為jar包運行jar包(需要啟動Hadoop)查看結(jié)果7.6.5 編譯打包代碼以及運行程序Hadoop 2.x 版本中的依賴版本中的依賴 jarHadoop 2.x 版本中 jar 不再集中在一個 hadoop-core*.jar 中,而是分成多個 jar,如使用 Hadoop 2.6.0 運行 WordCount 實例至少需要如下三個 jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar$HADOOP_HOME/share/hadoop/mapreduce/hadoop

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論