用 Hadoop 進行分布式并行編程_第1頁
用 Hadoop 進行分布式并行編程_第2頁
用 Hadoop 進行分布式并行編程_第3頁
用 Hadoop 進行分布式并行編程_第4頁
用 Hadoop 進行分布式并行編程_第5頁
已閱讀5頁,還剩45頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1、用 Hadoop 進行分布式并行編程, 第 1 部分基本概念與安裝部署文檔選項打印本頁將此頁作為電子郵件發(fā)送級別: 初級曹 羽中 (caoyuz), 軟件工程師, IBM中國開發(fā)中心2008 年 5 月 22 日Hadoop 是一個實現(xiàn)了 MapReduce 計算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運行于計算機集群上,完成海量數(shù)據(jù)的計算。本文將介紹 MapReduce 計算模型,分布式并行計算等基本概念,以及 Hadoop 的安裝部署和基本運行方法。Hadoop 簡介Hadoop 是一個開源的可運行于大規(guī)模集群上的分布式并行編

2、程框架,由于分布式存儲對于分布式編程來說是必不可少的,這個框架中還包含了一個分布式文件系統(tǒng) HDFS( Hadoop Distributed File System )。也許到目前為止,Hadoop 還不是那么廣為人知,其最新的版本號也僅僅是 0.16,距離 1.0 似乎都還有很長的一段距離,但提及 Hadoop 一脈相承的另外兩個開源項目 Nutch 和 Lucene ( 三者的創(chuàng)始人都是 Doug Cutting ),那絕對是大名鼎鼎。Lucene 是一個用 Java 開發(fā)的開源高性能全文檢索工具包,它不是一個完整的應(yīng)用程序,而是一套簡單易用的 API 。在全世界范圍內(nèi),已有無數(shù)的軟件系統(tǒng)

3、,Web 網(wǎng)站基于 Lucene 實現(xiàn)了全文檢索功能,后來 Doug Cutting 又開創(chuàng)了第一個開源的 Web 搜索引擎() Nutch, 它在 Lucene 的基礎(chǔ)上增加了網(wǎng)絡(luò)爬蟲和一些和 Web 相關(guān)的功能,一些解析各類文檔格式的插件等,此外,Nutch 中還包含了一個分布式文件系統(tǒng)用于存儲數(shù)據(jù)。從 Nutch 0.8.0 版本之后,Doug Cutting 把 Nutch 中的分布式文件系統(tǒng)以及實現(xiàn) MapReduce 算法的代碼獨立出來形成了一個新的開源項 Hadoop。Nutch 也演化為基于 Lucene 全文檢索以及 Hadoop 分布式

4、計算平臺的一個開源搜索引擎?;?Hadoop,你可以輕松地編寫可處理海量數(shù)據(jù)的分布式并行程序,并將其運行于由成百上千個結(jié)點組成的大規(guī)模計算機集群上。從目前的情況來看,Hadoop 注定會有一個輝煌的未來:"云計算"是目前灸手可熱的技術(shù)名詞,全球各大 IT 公司都在投資和推廣這種新一代的計算模式,而 Hadoop 又被其中幾家主要的公司用作其"云計算"環(huán)境中的重要基礎(chǔ)軟件,如:雅虎正在借助 Hadoop 開源平臺的力量對抗 Google, 除了資助 Hadoop 開發(fā)團隊外,還在開發(fā)基于 Hadoop 的開源項目 Pig, 這是一個專注于海量數(shù)據(jù)集分析的

5、分布式計算程序。Amazon 公司基于 Hadoop 推出了 Amazon S3 ( Amazon Simple Storage Service ),提供可靠,快速,可擴展的網(wǎng)絡(luò)存儲服務(wù),以及一個商用的云計算平臺 Amazon EC2 ( Amazon Elastic Compute Cloud )。在 IBM 公司的云計算項目-"藍云計劃"中,Hadoop 也是其中重要的基礎(chǔ)軟件。Google 正在跟IBM合作,共同推廣基于 Hadoop 的云計算?;仨撌子泳幊谭绞降淖兏镌谀柖傻淖饔孟拢郧俺绦騿T根本不用考慮計算機的性能會跟不上軟件的發(fā)展,因為約每隔 18 個月,C

6、PU 的主頻就會增加一倍,性能也將提升一倍,軟件根本不用做任何改變,就可以享受免費的性能提升。然而,由于晶體管電路已經(jīng)逐漸接近其物理上的性能極限,摩爾定律在 2005 年左右開始失效了,人類再也不能期待單個 CPU 的速度每隔 18 個月就翻一倍,為我們提供越來越快的計算性能。Intel, AMD, IBM 等芯片廠商開始從多核這個角度來挖掘 CPU 的性能潛力,多核時代以及互聯(lián)網(wǎng)時代的到來,將使軟件編程方式發(fā)生重大變革,基于多核的多線程并發(fā)編程以及基于大規(guī)模計算機集群的分布式并行編程是將來軟件性能提升的主要途徑。許多人認為這種編程方式的重大變化將帶來一次軟件的并發(fā)危機,因為我們傳統(tǒng)的軟件方式

7、基本上是單指令單數(shù)據(jù)流的順序執(zhí)行,這種順序執(zhí)行十分符合人類的思考習(xí)慣,卻與并發(fā)并行編程格格不入?;诩旱姆植际讲⑿芯幊棠軌蜃屲浖c數(shù)據(jù)同時運行在連成一個網(wǎng)絡(luò)的許多臺計算機上,這里的每一臺計算機均可以是一臺普通的 PC 機。這樣的分布式并行環(huán)境的最大優(yōu)點是可以很容易的通過增加計算機來擴充新的計算結(jié)點,并由此獲得不可思議的海量計算能力, 同時又具有相當(dāng)強的容錯能力,一批計算結(jié)點失效也不會影響計算的正常進行以及結(jié)果的正確性。Google 就是這么做的,他們使用了叫做 MapReduce 的并行編程模型進行分布式并行編程,運行在叫做 GFS ( Google File System )的分布式文件系

8、統(tǒng)上,為全球億萬用戶提供搜索服務(wù)。Hadoop 實現(xiàn)了 Google 的 MapReduce 編程模型,提供了簡單易用的編程接口,也提供了它自己的分布式文件系統(tǒng) HDFS,與 Google 不同的是,Hadoop 是開源的,任何人都可以使用這個框架來進行并行編程。如果說分布式并行編程的難度足以讓普通程序員望而生畏的話,開源的 Hadoop 的出現(xiàn)極大的降低了它的門檻,讀完本文,你會發(fā)現(xiàn)基于 Hadoop 編程非常簡單,無須任何并行開發(fā)經(jīng)驗,你也可以輕松的開發(fā)出分布式的并行程序,并讓其令人難以置信地同時運行在數(shù)百臺機器上,然后在短時間內(nèi)完成海量數(shù)據(jù)的計算。你可能會覺得你不可能會擁有數(shù)百臺機器來運

9、行你的并行程序,而事實上,隨著"云計算"的普及,任何人都可以輕松獲得這樣的海量計算能力。 例如現(xiàn)在 Amazon 公司的云計算平臺 Amazon EC2 已經(jīng)提供了這種按需計算的租用服務(wù),有興趣的讀者可以去了解一下,這篇系列文章的第三部分將有所介紹。掌握一點分布式并行編程的知識對將來的程序員是必不可少的,Hadoop 是如此的簡便好用,何不嘗試一下呢?也許你已經(jīng)急不可耐的想試一下基于 Hadoop 的編程是怎么回事了,但畢竟這種編程模型與傳統(tǒng)的順序程序大不相同,掌握一點基礎(chǔ)知識才能更好地理解基于 Hadoop 的分布式并行程序是如何編寫和運行的。因此本文會先介紹一下 Map

10、Reduce 的計算模型,Hadoop 中的分布式文件系統(tǒng) HDFS, Hadoop 是如何實現(xiàn)并行計算的,然后才介紹如何安裝和部署 Hadoop 框架,以及如何運行 Hadoop 程序?;仨撌譓apReduce 計算模型MapReduce 是 Google 公司的核心計算模型,它將復(fù)雜的運行于大規(guī)模集群上的并行計算過程高度的抽象到了兩個函數(shù),Map 和 Reduce, 這是一個令人驚訝的簡單卻又威力巨大的模型。適合用 MapReduce 來處理的數(shù)據(jù)集(或任務(wù))有一個基本要求: 待處理的數(shù)據(jù)集可以分解成許多小的數(shù)據(jù)集,而且每一個小數(shù)據(jù)集都可以完全并行地進行處理。圖 1. MapReduce

11、計算流程 圖一說明了用 MapReduce 來處理大數(shù)據(jù)集的過程, 這個 MapReduce 的計算過程簡而言之,就是將大數(shù)據(jù)集分解為成百上千的小數(shù)據(jù)集,每個(或若干個)數(shù)據(jù)集分別由集群中的一個結(jié)點(一般就是一臺普通的計算機)進行處理并生成中間結(jié)果,然后這些中間結(jié)果又由大量的結(jié)點進行合并, 形成最終結(jié)果。計算模型的核心是 Map 和 Reduce 兩個函數(shù),這兩個函數(shù)由用戶負責(zé)實現(xiàn),功能是按一定的映射規(guī)則將輸入的 <key, value> 對轉(zhuǎn)換成另一個或一批 <key, value> 對輸出。表一 Map 和 Reduce 函數(shù)函數(shù)輸入輸出說明Map<

12、k1, v1>List(<k2,v2>)1. 將小數(shù)據(jù)集進一步解析成一批 <key,value> 對,輸入 Map 函數(shù)中進行處理。2. 每一個輸入的 <k1,v1> 會輸出一批 <k2,v2>。 <k2,v2> 是計算的中間結(jié)果。Reduce<k2,List(v2)><k3,v3>輸入的中間結(jié)果 <k2,List(v2)> 中的 List(v2) 表示是一批屬于同一個 k2 的 value以一個計算文本文件中每個單詞出現(xiàn)的次數(shù)的程序為例,<k1,v1> 可以是 <行在文件中

13、的偏移位置, 文件中的一行>,經(jīng) Map 函數(shù)映射之后,形成一批中間結(jié)果 <單詞,出現(xiàn)次數(shù)>, 而 Reduce 函數(shù)則可以對中間結(jié)果進行處理,將相同單詞的出現(xiàn)次數(shù)進行累加,得到每個單詞的總的出現(xiàn)次數(shù)?;?MapReduce 計算模型編寫分布式并行程序非常簡單,程序員的主要編碼工作就是實現(xiàn) Map 和 Reduce 函數(shù),其它的并行編程中的種種復(fù)雜問題,如分布式存儲,工作調(diào)度,負載平衡,容錯處理,網(wǎng)絡(luò)通信等,均由 MapReduce 框架(比如 Hadoop )負責(zé)處理,程序員完全不用操心。回頁首四 集群上的并行計算MapReduce 計算模型非常適合在大量計算機組成的大規(guī)

14、模集群上并行運行。圖一中的每一個 Map 任務(wù)和每一個 Reduce 任務(wù)均可以同時運行于一個單獨的計算結(jié)點上,可想而知其運算效率是很高的,那么這樣的并行計算是如何做到的呢?數(shù)據(jù)分布存儲Hadoop 中的分布式文件系統(tǒng) HDFS 由一個管理結(jié)點 ( NameNode )和N個數(shù)據(jù)結(jié)點 ( DataNode )組成,每個結(jié)點均是一臺普通的計算機。在使用上同我們熟悉的單機上的文件系統(tǒng)非常類似,一樣可以建目錄,創(chuàng)建,復(fù)制,刪除文件,查看文件內(nèi)容等。但其底層實現(xiàn)上是把文件切割成 Block,然后這些 Block 分散地存儲于不同的 DataNode 上,每個 Block 還可以復(fù)制數(shù)份存儲于不同的 D

15、ataNode 上,達到容錯容災(zāi)之目的。NameNode 則是整個 HDFS 的核心,它通過維護一些數(shù)據(jù)結(jié)構(gòu),記錄了每一個文件被切割成了多少個 Block,這些 Block 可以從哪些 DataNode 中獲得,各個 DataNode 的狀態(tài)等重要信息。如果你想了解更多的關(guān)于 HDFS 的信息,可進一步閱讀參考資料: The Hadoop Distributed File System:Architecture and Design分布式并行計算Hadoop 中有一個作為主控的 JobTracker,用于調(diào)度和管理其它的 TaskTracker, JobTracker 可以運行于集群

16、中任一臺計算機上。TaskTracker 負責(zé)執(zhí)行任務(wù),必須運行于 DataNode 上,即 DataNode 既是數(shù)據(jù)存儲結(jié)點,也是計算結(jié)點。 JobTracker 將 Map 任務(wù)和 Reduce 任務(wù)分發(fā)給空閑的 TaskTracker, 讓這些任務(wù)并行運行,并負責(zé)監(jiān)控任務(wù)的運行情況。如果某一個 TaskTracker 出故障了,JobTracker 會將其負責(zé)的任務(wù)轉(zhuǎn)交給另一個空閑的 TaskTracker 重新運行。本地計算數(shù)據(jù)存儲在哪一臺計算機上,就由這臺計算機進行這部分數(shù)據(jù)的計算,這樣可以減少數(shù)據(jù)在網(wǎng)絡(luò)上的傳輸,降低對網(wǎng)絡(luò)帶寬的需求。在 Hadoop 這樣的基于集群的分布式并行系

17、統(tǒng)中,計算結(jié)點可以很方便地擴充,而因它所能夠提供的計算能力近乎是無限的,但是由是數(shù)據(jù)需要在不同的計算機之間流動,故網(wǎng)絡(luò)帶寬變成了瓶頸,是非常寶貴的,“本地計算”是最有效的一種節(jié)約網(wǎng)絡(luò)帶寬的手段,業(yè)界把這形容為“移動計算比移動數(shù)據(jù)更經(jīng)濟”。圖 2. 分布存儲與并行計算 任務(wù)粒度把原始大數(shù)據(jù)集切割成小數(shù)據(jù)集時,通常讓小數(shù)據(jù)集小于或等于 HDFS 中一個 Block 的大小(缺省是 64M),這樣能夠保證一個小數(shù)據(jù)集位于一臺計算機上,便于本地計算。有 M 個小數(shù)據(jù)集待處理,就啟動 M 個 Map 任務(wù),注意這 M 個 Map 任務(wù)分布于 N 臺計算機上并行運行,Reduce 任務(wù)的數(shù)量 R

18、 則可由用戶指定。Partition把 Map 任務(wù)輸出的中間結(jié)果按 key 的范圍劃分成 R 份( R 是預(yù)先定義的 Reduce 任務(wù)的個數(shù)),劃分時通常使用 hash 函數(shù)如: hash(key) mod R,這樣可以保證某一段范圍內(nèi)的 key,一定是由一個 Reduce 任務(wù)來處理,可以簡化 Reduce 的過程。Combine在 partition 之前,還可以對中間結(jié)果先做 combine,即將中間結(jié)果中有相同 key的 <key, value> 對合并成一對。combine 的過程與 Reduce 的過程類似,很多情況下就可以直接使用 Reduce 函數(shù),但 comb

19、ine 是作為 Map 任務(wù)的一部分,在執(zhí)行完 Map 函數(shù)后緊接著執(zhí)行的。Combine 能夠減少中間結(jié)果中 <key, value> 對的數(shù)目,從而減少網(wǎng)絡(luò)流量。Reduce 任務(wù)從 Map 任務(wù)結(jié)點取中間結(jié)果Map 任務(wù)的中間結(jié)果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盤。中間結(jié)果文件的位置會通知主控 JobTracker, JobTracker 再通知 Reduce 任務(wù)到哪一個 DataNode 上去取中間結(jié)果。注意所有的 Map 任務(wù)產(chǎn)生中間結(jié)果均按其 Key 用同一個 Hash 函數(shù)劃分成了 R 份,R 個 Reduce 任務(wù)各自負責(zé)

20、一段 Key 區(qū)間。每個 Reduce 需要向許多個 Map 任務(wù)結(jié)點取得落在其負責(zé)的 Key 區(qū)間內(nèi)的中間結(jié)果,然后執(zhí)行 Reduce 函數(shù),形成一個最終的結(jié)果文件。任務(wù)管道有 R 個 Reduce 任務(wù),就會有 R 個最終結(jié)果,很多情況下這 R 個最終結(jié)果并不需要合并成一個最終結(jié)果。因為這 R 個最終結(jié)果又可以做為另一個計算任務(wù)的輸入,開始另一個并行計算任務(wù)?;仨撌孜?Hadoop 初體驗Hadoop 支持 Linux 及 Windows 操作系統(tǒng), 但其官方網(wǎng)站聲明 Hadoop 的分布式操作在 Windows 上未做嚴格測試,建議只把 Windows 作為 Hadoop 的開發(fā)平臺。在

21、 Windows 環(huán)境上的安裝步驟如下( Linux 平臺類似,且更簡單一些):(1)在 Windows 下,需要先安裝 Cgywin, 安裝 Cgywin 時注意一定要選擇安裝 openssh (在 Net category )。安裝完成之后,把 Cgywin 的安裝目錄如 c:cygwinbin 加到系統(tǒng)環(huán)境變量 PATH 中,這是因為運行 Hadoop 要執(zhí)行一些 linux 環(huán)境下的腳本和命令。(2)安裝 Java 1.5.x,并將 JAVA_HOME 環(huán)境變量設(shè)置為 Java 的安裝根目錄如 C:Program FilesJavajdk1.5.0_01。(3)到 Hadoop 官方網(wǎng)

22、站 下載Hadoop Core, 最新的穩(wěn)定版本是 0.16.0. 將下載后的安裝包解壓到一個目錄,本文假定解壓到 c:hadoop-0.16.0。4)修改 conf/hadoop-env.sh 文件,在其中設(shè)置 JAVA_HOME 環(huán)境變量: export JAVA_HOME="C:Program FilesJavajdk1.5.0_01” (因為路徑中 Program Files 中間有空格,一定要用雙引號將路徑引起來)至此,一切就緒,可以運行 Hadoop 了。以下的運行過程,需要啟動 cygwin, 進入模擬 Linux

23、 環(huán)境。在下載的 Hadoop Core 包中,帶有幾個示例程序并且已經(jīng)打包成了 hadoop-0.16.0-examples.jar。其中有一個 WordCount 程序,功能是統(tǒng)計一批文本文件中各個單詞出現(xiàn)的次數(shù),我們先來看看怎么運行這個程序。Hadoop 共有三種運行模式: 單機(非分布式)模式,偽分布式運行模式,分布式運行模式,其中前兩種運行模式體現(xiàn)不了 Hadoop 分布式計算的優(yōu)勢,并沒有什么實際意義,但對程序的測試及調(diào)試很有幫助,我們先從這兩種模式入手,了解基于 Hadoop 的分布式并行程序是如何編寫和運行的。單機(非分布式)模式這種模式在一臺單機上運行,沒有分布式文件系統(tǒng),而

24、是直接讀寫本地操作系統(tǒng)的文件系統(tǒng)。代碼清單1 $ cd /cygdrive/c/hadoop-0.16.0$ mkdir test-in $ cd test-in#在 test-in 目錄下創(chuàng)建兩個文本文件, WordCount 程序?qū)⒔y(tǒng)計其中各個單詞出現(xiàn)次數(shù)$ echo "hello world bye world" >file1.txt $ echo "hello hadoop goodbye hadoop" >file2.txt$ cd .$ bin/hadoop jar hadoop-0.16.0-examples.jar wordc

25、ount test-in test-out#執(zhí)行完畢,下面查看執(zhí)行結(jié)果:$ cd test-out$ cat part-00000bye 1goodbye 1hadoop 2hello 2world 2注意事項:運行 bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out 時,務(wù)必注意第一個參數(shù)是 jar, 不是 -jar, 當(dāng)你用 -jar 時,不會告訴你是參數(shù)錯了,報告出來的錯誤信息是:Exception in thread "main" java.lang.NoClassDefFound

26、Error: org/apache/hadoop/util/ProgramDriver, 筆者當(dāng)時以為是 classpath 的設(shè)置問題,浪費了不少時間。通過分析 bin/hadoop 腳本可知,-jar 并不是 bin/hadoop 腳本定義的參數(shù),此腳本會把 -jar 作為 Java 的參數(shù),Java 的-jar 參數(shù)表示執(zhí)行一個 Jar 文件(這個 Jar 文件必須是一個可執(zhí)行的 Jar,即在 MANIFEST 中定義了主類), 此時外部定義的 classpath 是不起作用的,因而會拋出 java.lang.NoClassDefFoundError 異常。而 jar 是 bin/had

27、oop 腳本定義的參數(shù),會調(diào)用 Hadoop 自己的一個工具類 RunJar,這個工具類也能夠執(zhí)行一個 Jar 文件,并且外部定義的 classpath 有效。偽分布式運行模式這種模式也是在一臺單機上運行,但用不同的 Java 進程模仿分布式運行中的各類結(jié)點 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),請注意分布式運行中的這幾個結(jié)點的區(qū)別:從分布式存儲的角度來說,集群中的結(jié)點由一個 NameNode 和若干個 DataNode 組成, 另有一個 Secondary NameNode 作為 NameNod

28、e 的備份。 從分布式應(yīng)用的角度來說,集群中的結(jié)點由一個 JobTracker 和若干個 TaskTracker 組成,JobTracker 負責(zé)任務(wù)的調(diào)度,TaskTracker 負責(zé)并行執(zhí)行任務(wù)。TaskTracker 必須運行在 DataNode 上,這樣便于數(shù)據(jù)的本地計算。JobTracker 和 NameNode 則無須在同一臺機器上。(1) 按代碼清單2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的參數(shù),你可以通過讀此文件了解 Hadoop 中有哪些參數(shù)可供配置,但不要修改此文件。可通過修改 conf

29、/hadoop-site.xml 改變?nèi)笔?shù)值,此文件中設(shè)置的參數(shù)值會覆蓋 conf/hadoop-default.xml 的同名參數(shù)。代碼清單 2 <configuration> <property> <name></name> <value>localhost:9000</value> </property> <property> <name>mapred.job.tracker</name> <value>localhost:

30、9001</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>參數(shù) 指定 NameNode 的 IP 地址和端口號。缺省值是 file:/, 表示使用本地文件系統(tǒng), 用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 NameNode。參數(shù) mapred.job.tracker

31、指定 JobTracker 的 IP 地址和端口號。缺省值是 local, 表示在本地同一 Java 進程內(nèi)執(zhí)行 JobTracker 和 TaskTracker, 用于單機非分布式模式。此處我們指定使用運行于本機 localhost 上的 JobTracker ( 用一個單獨的 Java 進程做 JobTracker )。參數(shù) dfs.replication 指定 HDFS 中每個 Block 被復(fù)制的次數(shù),起數(shù)據(jù)冗余備份的作用。 在典型的生產(chǎn)系統(tǒng)中,這個數(shù)常常設(shè)置為3。(2)配置 SSH,如代碼清單3所示:代碼清單 3 $ ssh-keygen -t dsa -P '' -

32、f /.ssh/id_dsa $ cat /.ssh/id_dsa.pub >> /.ssh/authorized_keys配置完后,執(zhí)行一下 ssh localhost, 確認你的機器可以用 SSH 連接,并且連接時不需要手工輸入密碼。(3)格式化一個新的分布式文件系統(tǒng), 如代碼清單4所示:代碼清單 4 $ cd /cygdrive/c/hadoop-0.16.0$ bin/hadoop namenode format (4) 啟動 hadoop 進程, 如代碼清單5所示。控制臺上的輸出信息應(yīng)該顯示啟動了 namenode, datanode, secondary namenod

33、e, jobtracker, tasktracker。啟動完成之后,通過 ps ef 應(yīng)該可以看到啟動了5個新的 java 進程。代碼清單 5 $ bin/start-all.sh $ ps ef(5) 運行 wordcount 應(yīng)用, 如代碼清單6所示:代碼清單 6 $ bin/hadoop dfs -put ./test-in input #將本地文件系統(tǒng)上的 ./test-in 目錄拷到 HDFS 的根目錄上,目錄名改為 input#執(zhí)行 bin/hadoop dfs help 可以學(xué)習(xí)各種 HDFS 命令的使用。$ bin/hadoop jar hadoop-0.16.0-exampl

34、es.jar wordcount input output#查看執(zhí)行結(jié)果:#將文件從 HDFS 拷到本地文件系統(tǒng)中再查看:$ bin/hadoop dfs -get output output $ cat output/*#也可以直接查看$ bin/hadoop dfs -cat output/*$ bin/stop-all.sh #停止 hadoop 進程故障診斷(1) 執(zhí)行 $ bin/start-all.sh 啟動 Hadoop 進程后,會啟動5個 java 進程, 同時會在 /tmp 目錄下創(chuàng)建五個 pid 文件記錄這些進程 ID 號。通過這五個文件,可以得知 namenode, da

35、tanode, secondary namenode, jobtracker, tasktracker 分別對應(yīng)于哪一個 Java 進程。當(dāng)你覺得 Hadoop 工作不正常時,可以首先查看這5個 java 進程是否在正常運行。(2) 使用 web 接口。訪問 http:/localhost:50030 可以查看 JobTracker 的運行狀態(tài)。訪問 http:/localhost:50060 可以查看 TaskTracker 的運行狀態(tài)。訪問 http:/localhost:50070 可以查看 NameNode 以及整個分布式文件系統(tǒng)的狀態(tài),瀏覽分布式文件系統(tǒng)中的文件以及 log 等。(3

36、) 查看 $HADOOP_HOME/logs 目錄下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一個對應(yīng)的 log 文件,每一次運行的計算任務(wù)也有對應(yīng)用 log 文件。分析這些 log 文件有助于找到故障原因?;仨撌捉Y(jié)束語現(xiàn)在,你已經(jīng)了解了 MapReduce 計算模型,分布式文件系統(tǒng) HDFS,分布式并行計算等的基本原理, 并且有了一個可以運行的 Hadoop 環(huán)境,運行了一個基于 Hadoop 的并行程序。在下一篇文章中,你將了解到如何針對一個具體的計算任務(wù),基于 Hadoop 編寫自

37、己的分布式并行程序并將其部署運行等內(nèi)容。聲明:本文僅代表作者個人之觀點,不代表 IBM 公司之觀點。用 Hadoop 進行分布式并行編程, 第 2 部分程序?qū)嵗c分析文檔選項打印本頁將此頁作為電子郵件發(fā)送樣例代碼級別: 初級曹 羽中 (caoyuz), 軟件工程師, IBM中國開發(fā)中心2008 年 5 月 22 日Hadoop 是一個實現(xiàn)了 MapReduce 計算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運行于計算機集群上,完成海量數(shù)據(jù)的計算。在本文中,詳細介紹了如何針對一個具體的并行計算任務(wù),基于 Hadoop 編寫程序,如何使

38、用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運行 Hadoop 程序。前言在上一篇文章:“用 Hadoop 進行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計算模型,分布式文件系統(tǒng) HDFS,分布式并行計算等的基本原理, 并且詳細介紹了如何安裝 Hadoop,如何運行基于 Hadoop 的并行程序。在本文中,將針對一個具體的計算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運行程序?;仨撌追治?WordCount 程序我們先來

39、看看 Hadoop 自帶的示例程序 WordCount,這個程序用于統(tǒng)計一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實現(xiàn)Map類見代碼清單1。這個類實現(xiàn) Mapper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負責(zé)收集 Mapper

40、和 Reducer 的輸出數(shù)據(jù),實現(xiàn) map 函數(shù)和 reduce 函數(shù)時,只需要簡單地將其輸出的 <key,value> 對往 OutputCollector 中一丟即可,剩余的事框架自會幫你處理好。代碼中 LongWritable, IntWritable, Text 均是 Hadoop 中實現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從而便于在分布式環(huán)境中進行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報告整個應(yīng)用的運行進度,本例中未使用。代碼清單1 public static class MapCla

41、ss extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws

42、IOException String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實現(xiàn) Reduce 類見代碼清單 2。這個類實現(xiàn) Reducer 接口中的 reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個 Iterator, 遍歷這個 Iterator, 就可以得

43、到屬于同一個 key 的所有 value. 此處,key 是一個單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個單詞的總的出現(xiàn)次數(shù)。代碼清單 2 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable&

44、gt; output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().get(); output.collect(key, new IntWritable(sum); 3.運行 Job在 Hadoop 中一次計算任務(wù)稱之為一個 job, 可以通過一個 JobConf 對象設(shè)置如何運行這個 job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實現(xiàn)的 MapClass 作為 Mapp

45、er 類,使用代碼清單2中實現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運行時會處理輸入路徑下的所有文件,并將計算結(jié)果寫到輸出路徑下。然后將 JobConf 對象作為參數(shù),調(diào)用 JobClient 的 runJob, 開始執(zhí)行這個計算任務(wù)。至于 main 方法中使用的 ToolRunner 是一個運行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。代碼清單 3 public int run(String args) throws Exception JobConf conf = new JobCon

46、f(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); con

47、f.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0; public static void main(String args) throws Exception if(args.length != 2) System.err.println("Usage: WordCount <input path> <output path>"); System.exit(-1); int res = ToolRunner.run(new Configuration(), new Wo

48、rdCount(), args); System.exit(res); 以上就是 WordCount 程序的全部細節(jié),簡單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過 JobConf 定制計算任務(wù)通過上文所述的 JobConf 對象,程序員可以設(shè)定各種參數(shù),定制如何完成一個計算任務(wù)。這些參數(shù)很多情況下就是一個 java 接口,通過注入這些接口的特定實現(xiàn),可以定義一個計算任務(wù)( job )的全部細節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計算程序時做到輕車熟路,游刃有余,明白哪些類是需要自己實現(xiàn)的,哪些類用 Hadoop 的缺省實

49、現(xiàn)即可。表一是對 JobConf 對象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說明,表中第一列中的參數(shù)在 JobConf 中均會有相應(yīng)的 get/set 方法,對程序員來說,只有在表中第三列中的缺省值無法滿足您的需求時,才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實現(xiàn)自己的計算目的。針對表格中第一列中的接口,除了第三列的缺省實現(xiàn)之外,Hadoop 通常還會有一些其它的實現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細的信息,在很多的情況下,您都不用實現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實現(xiàn)即可。表一 JobCo

50、nf 常用可定制參數(shù)參數(shù)作用缺省值其它實現(xiàn)InputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集 InputSplits, 每一個 InputSplit 將由一個 Mapper 負責(zé)處理。此外 InputFormat 中還提供一個 RecordReader 的實現(xiàn), 將一個 InputSplit 解析成 <key,value> 對提供給 map 函數(shù)。TextInputFormat(針對文本文件,按行將文本文件切割成 InputSplits, 并用 LineRecordReader 將 InputSplit 解析成 <key,value> 對,key 是行在文件中的位置,v

51、alue 是文件中的一行)SequenceFileInputFormatOutputFormat提供一個 RecordWriter 的實現(xiàn),負責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個 <key,value> 對一行,key 和 value 之間用 tab 分隔)SequenceFileOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類型LongWritableOutputValueClass輸出的最終結(jié)果中 value 的類型TextMapperClassMapper 類,實

52、現(xiàn) map 函數(shù),完成輸入的 <key,value> 到中間結(jié)果的映射IdentityMapper(將輸入的 <key,value> 原封不動的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不對中間結(jié)果中的重復(fù) key 做合并)ReducerClassReducer 類,實現(xiàn) reduce 函數(shù),對中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將中間結(jié)果直接輸出為最終結(jié)果)AccumulatingRedu

53、cer, LongSumReducerInputPath設(shè)定 job 的輸入目錄, job 運行時會處理輸入目錄下的所有文件nullOutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會寫入輸出目錄下nullMapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型如果用戶沒有設(shè)定的話,使用 OutputKeyClassMapOutputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 value 的類型如果用戶沒有設(shè)定的話,使用 OutputValuesClassOutputKeyComparator對結(jié)果中的 key 進行排序時的使用的比較器Wr

54、itableComparablePartitionerClass對中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個 Reducer 負責(zé)處理。HashPartitioner(使用 Hash 函數(shù)做 partition)KeyFieldBasedPartitioner PipesPartitioner回頁首改進的 WordCount 程序現(xiàn)在你對 Hadoop 并行程序的細節(jié)已經(jīng)有了比較深入的了解,我們來把 WordCount 程序改進一下,目標: (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標點符號與單詞混雜在一起,改進后的程序應(yīng)該能夠正確的

55、切出單詞,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出現(xiàn)頻率的降序進行排序。1.修改 Mapper 類,實現(xiàn)目標(1)實現(xiàn)很簡單,見代碼清單4中的注釋。代碼清單 4 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="w" /正則表達式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫字母 li

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論