版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、用Hadoop進(jìn)行分布式并行編程(二)(注:本文檔來(lái)自hadoop in china)程序?qū)嵗c分析Hadoop 是一個(gè)實(shí)現(xiàn)了MapReduce 計(jì)算模型的開(kāi)源分布式并行編程框架,借助于Hadoop, 程序員可以輕松地編寫(xiě)分布式并行程序,將其運(yùn)行于計(jì)算機(jī)集群上,完成海量數(shù)據(jù)的計(jì)算。在本文中,詳細(xì)介紹了如何針對(duì)一個(gè)具體的并行計(jì)算任務(wù),基于Hadoop 編寫(xiě)程序,如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運(yùn)行 Hadoop 程序。前言在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,
2、分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本文中,將針對(duì)一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫(xiě)并行程序,如何使用 IBM 開(kāi)發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。分析 WordCount 程序我們先來(lái)看看 Hadoop 自帶的示例程序 WordCount, 這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實(shí)現(xiàn)Map類(lèi)見(jiàn)代碼清單1。這個(gè)類(lèi)實(shí)現(xiàn) M
3、apper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫(xiě)入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡(jiǎn)單地將其輸出的 <key,value> 對(duì)往 OutputCollector 中一丟即可,剩余的事框架自會(huì)幫你處理好。代碼中 LongWri
4、table, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類(lèi)型的類(lèi),這些類(lèi)都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。代碼清單1public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text,
5、IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); pub
6、lic void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toStrin
7、g(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn)Reduce類(lèi)見(jiàn)代碼清單2。這個(gè)類(lèi)實(shí)現(xiàn) Reducer 接口中的
8、reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。代碼清單 2public static class Reduce extends MapReduceBase implements Reducer<Text, IntW
9、ritable, Text, IntWritable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Report
10、er reporter) throws IOException int sum = 0; while (values.hasNext() output.collect(key, new IntWritable(sum); sum += values.next().get(); 3.運(yùn)行Job在 Hado
11、op 中一次計(jì)算任務(wù)稱(chēng)之為一個(gè) job, 可以通過(guò)一個(gè) JobConf 對(duì)象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類(lèi)型是 Text, value 的類(lèi)型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類(lèi),使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類(lèi)和 Combiner 類(lèi), 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會(huì)處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫(xiě)到輸出路徑下。然后將 JobConf 對(duì)象作為參數(shù),調(diào)用 JobClient 的 runJob, 開(kāi)始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方
12、法中使用的 ToolRunner 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類(lèi),依樣畫(huà)葫蘆用之即可。代碼清單 3public int run(String args) throws Exception JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.c
13、lass); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0;public static void main(String args) throws Excep
14、tion if(args.length != 2) int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res);System.err.println("Usage: WordCount <input path> <output
15、path>"); System.exit(-1);以上就是 WordCount 程序的全部細(xì)節(jié),簡(jiǎn)單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過(guò)JobConf 定制計(jì)算任務(wù)通過(guò)上文所述的 JobConf 對(duì)象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) java 接口,通過(guò)注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫(xiě)自己的并行計(jì)算程序時(shí)做到輕車(chē)熟路,游刃有余,明白哪些類(lèi)是需要自己實(shí)現(xiàn)的,哪些類(lèi)用 Hado
16、op 的缺省實(shí)現(xiàn)即可。表一是對(duì) JobConf 對(duì)象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說(shuō)明,表中第一列中的參數(shù)在 JobConf 中均會(huì)有相應(yīng)的 get/set 方法,對(duì)程序員來(lái)說(shuō),只有在表中第三列中的缺省值無(wú)法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對(duì)表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會(huì)有一些其它的實(shí)現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。表
17、一 JobConf 常用可定制參數(shù)參數(shù)作用缺省值其它實(shí)現(xiàn)inputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集inputSplits, 每一個(gè)InputSplit將 由一個(gè)Mapper負(fù)責(zé)處理。此外inputFormat中還提供一個(gè)RecordReader的 實(shí)現(xiàn), 將一個(gè)InputSplit解 析成key,value 對(duì)提供給 map 函數(shù)。TextInputFormat(針 對(duì)文本文件,按行將文本文件切割成InputSplits, 并用LineRecordReader將InputSplit解 析成 key,value 對(duì),key是行在文件中的位置,value是文件中的一行)SequenceFi
18、leInputFormatOutputFormat提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫(xiě)成純文件文件,每個(gè)key,value對(duì)一行,key 和 value 之間用 tab 分隔)SequenceFileOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類(lèi)型LongWritable OutputValueClass輸出的最終結(jié)果中 value 的類(lèi)型Text MapperClassMapper 類(lèi),實(shí)現(xiàn) map 函數(shù),完成輸入的 key,va
19、lue 到中間結(jié)果的映射IdentityMapper(將 輸入的 key,value 原封不動(dòng)的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不對(duì)中間結(jié)果中的重復(fù) key 做合并) ReducerClassReducer 類(lèi),實(shí)現(xiàn) reduce 函數(shù),對(duì)中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將 中間結(jié)果直接輸出為最終結(jié)果)AccumulatingReducer,LongSumReducerInputPath設(shè)定
20、 job 的輸入目錄, job 運(yùn)行時(shí)會(huì)處理輸入目錄下的所有文件null OutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會(huì)寫(xiě)入輸出目錄下null MapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類(lèi)型如果用戶沒(méi)有設(shè)定的話,使用OutputKeyClass MapOutputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中value 的類(lèi)型 如果用戶沒(méi)有設(shè)定的話,使用 OutputValuesClass OutputKeyComparator對(duì)結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器WritableC
21、omparable PartitionerClass對(duì)中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。HashPartitioner(使 用 Hash 函數(shù)做 partition)KeyFieldBasedPartitionerPipesPartitioner改進(jìn)的 WordCount 程序現(xiàn)在你對(duì) Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來(lái)把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類(lèi)標(biāo)點(diǎn)符號(hào)與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞
22、,并且單詞不要區(qū)分大小寫(xiě)。(2)在最終結(jié)果中,按單詞出 現(xiàn)頻率的降序進(jìn)行排序。1.修改 Mapper 類(lèi),實(shí)現(xiàn)目標(biāo)(1)實(shí)現(xiàn)很簡(jiǎn)單,見(jiàn)代碼清單4中 的注釋。代碼清單 4public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
23、 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); &
24、#160; private String pattern="w" /正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> out
25、put, Reporter reporter) throws IOException String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫(xiě)字母
26、; line = line.replaceAll(pattern, " "); /將非0-9, a-z, A-Z的字符替換為空格 StringTokenizer itr = new StringTokenizer(line);
27、; while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one);
28、0; 2.實(shí)現(xiàn)目標(biāo)(2)用一個(gè)并行計(jì)算任務(wù)顯然是無(wú)法同時(shí)完成單詞詞頻統(tǒng)計(jì)和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。在 Hadoop 中要實(shí)現(xiàn)排序是很簡(jiǎn)單的,因?yàn)樵?MapReduce 的過(guò)程中,會(huì)把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給
29、R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會(huì)有一個(gè)按 key 進(jìn)行排序的過(guò)程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)按詞頻排序,我們指定使用 InverseMapper 類(lèi)作為排序任務(wù)的 Mapper 類(lèi)( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類(lèi)的 map 函數(shù)簡(jiǎn)單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序
30、的最終結(jié)果。我們無(wú)需指定 Reduce 類(lèi),Hadoop 會(huì)使用缺省的 IdentityReducer 類(lèi),將中間結(jié)果原樣輸出。還有一個(gè)問(wèn)題需要解決: 排序任務(wù)中的 Key 的類(lèi)型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class), Hadoop 默認(rèn)對(duì) IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類(lèi),并指定使用這個(gè)自定義的 Comparator 類(lèi)對(duì)輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutput
31、KeyComparatorClass(IntWritableDecreasingComparator.class)詳見(jiàn)代碼清單 5 及其中的注釋。代碼清單 5public int run(String args) throws Exception Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Rando
32、m().nextInt(Integer.MAX_VALUE); /定義一個(gè)臨時(shí)目錄 JobConf conf = new JobConf(getConf(), WordCount.class);try conf.setJobName("wordcount"); conf.setOutputKe
33、yClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MapClass.class);conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputPath(new Path(args0);conf.setOutputPath(tempDir); /先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫(xiě)到臨時(shí)目/錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。conf.setOutputFormat(Sequ
34、enceFileOutputFormat.class);JobClient.runJob(conf);JobConf sortJob = new JobConf(getConf(), WordCount.class);sortJob.setJobName("sort"); sortJob.setInputPath(tempDir);sortJob.setInputFormat(SequenceFileInputFormat.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setNumReduceTasks
35、(1); /將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果/文件就是一個(gè)。 sortJob.setOutputPath(new Path(args1);sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);JobClient.runJob(sortJob); finally
36、160; FileSystem.get(conf).delete(tempDir); /刪除臨時(shí)目錄 return 0; private static clas
37、s IntWritableDecreasingComparator extends IntWritable.Comparator public int compare(WritableComparable a, WritableComparable b) return -pare(a, b); public int compare(byte b1, int s1, int l1, byte b2, int
38、 s2, int l2) return -pare(b1, s1, l1, b2, s2, l2); 在Eclipse 環(huán)境下進(jìn)行開(kāi)發(fā)和調(diào)試在 Eclipse 環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開(kāi)發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡(jiǎn)化開(kāi)發(fā)和部署 Hadoop 并行程序的過(guò)程?;谶@個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類(lèi)開(kāi)發(fā)的向?qū)?,可以打包?JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過(guò)一個(gè)專(zhuān)門(mén)的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)??稍?IBM alphaWorks 網(wǎng)站下
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 旅游行業(yè)客服溝通技巧與策略
- 2024年地震數(shù)據(jù)采集系統(tǒng)項(xiàng)目資金需求報(bào)告代可行性研究報(bào)告
- 2024年試驗(yàn)機(jī):拉力試驗(yàn)機(jī)項(xiàng)目項(xiàng)目投資申請(qǐng)報(bào)告代可行性研究報(bào)告
- 高效能的學(xué)校電網(wǎng)設(shè)計(jì)與維護(hù)管理
- 新時(shí)代教育下的情境教學(xué)方法論探討
- 職場(chǎng)新手教師如何快速掌握小學(xué)數(shù)學(xué)教學(xué)技巧
- 物聯(lián)網(wǎng)時(shí)代的安全產(chǎn)品創(chuàng)新與市場(chǎng)拓展策略
- 職場(chǎng)技能提升與就業(yè)競(jìng)爭(zhēng)力培養(yǎng)
- 科技領(lǐng)域中實(shí)踐教學(xué)的價(jià)值體現(xiàn)
- 2025年菏澤家政職業(yè)學(xué)院高職單招語(yǔ)文2018-2024歷年參考題庫(kù)頻考點(diǎn)含答案解析
- 《揚(yáng)州東關(guān)街掠影》課件
- 環(huán)保行業(yè)研究報(bào)告
- 物流服務(wù)項(xiàng)目的投標(biāo)書(shū)
- 廣西太陽(yáng)能資源分析
- 地鐵車(chē)站低壓配電及照明系統(tǒng)
- 行業(yè)會(huì)計(jì)比較(第三版)PPT完整全套教學(xué)課件
- 值機(jī)業(yè)務(wù)與行李運(yùn)輸實(shí)務(wù)(第3版)高職PPT完整全套教學(xué)課件
- 高考英語(yǔ)語(yǔ)法填空專(zhuān)項(xiàng)訓(xùn)練(含解析)
- 42式太極劍劍譜及動(dòng)作說(shuō)明(吳阿敏)
- 部編版語(yǔ)文小學(xué)五年級(jí)下冊(cè)第一單元集體備課(教材解讀)
- 仁愛(ài)英語(yǔ)九年級(jí)下冊(cè)單詞表(中英文)
評(píng)論
0/150
提交評(píng)論