大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀.docx_第1頁(yè)
大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀.docx_第2頁(yè)
大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀.docx_第3頁(yè)
大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀.docx_第4頁(yè)
大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀.docx_第5頁(yè)
已閱讀5頁(yè),還剩4頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

大數(shù)據(jù)計(jì)算平臺(tái)Spark內(nèi)核全面解讀1、Spark介紹Spark是起源于美國(guó)加州大學(xué)伯克利分校AMPLab的大數(shù)據(jù)計(jì)算平臺(tái),在2010年開(kāi)源,目前是Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。隨著Spark在大數(shù)據(jù)計(jì)算領(lǐng)域的暫露頭角,越來(lái)越多的企業(yè)開(kāi)始關(guān)注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競(jìng)賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節(jié)點(diǎn)數(shù),把100TB數(shù)據(jù)的排序時(shí)間從72分鐘提高到了23分鐘。Spark在架構(gòu)上包括內(nèi)核部分和4個(gè)官方子模塊-Spark SQL、Spark Streaming、機(jī)器學(xué)習(xí)庫(kù)MLlib和圖計(jì)算庫(kù)GraphX。圖1所示為Spark在伯克利的數(shù)據(jù)分析軟件棧BDAS(Berkeley Data Analytics Stack)中的位置??梢?jiàn)Spark專注于數(shù)據(jù)的計(jì)算,而數(shù)據(jù)的存儲(chǔ)在生產(chǎn)環(huán)境中往往還是由Hadoop分布式文件系統(tǒng)HDFS承擔(dān)。圖1 Spark在BDAS中的位置Spark被設(shè)計(jì)成支持多場(chǎng)景的通用大數(shù)據(jù)計(jì)算平臺(tái),它可以解決大數(shù)據(jù)計(jì)算中的批處理,交互查詢及流式計(jì)算等核心問(wèn)題。Spark可以從多數(shù)據(jù)源的讀取數(shù)據(jù),并且擁有不斷發(fā)展的機(jī)器學(xué)習(xí)庫(kù)和圖計(jì)算庫(kù)供開(kāi)發(fā)者使用。數(shù)據(jù)和計(jì)算在Spark內(nèi)核及Spark的子模塊中是打通的,這就意味著Spark內(nèi)核和子模塊之間成為一個(gè)整體。Spark的各個(gè)子模塊以Spark內(nèi)核為基礎(chǔ),進(jìn)一步支持更多的計(jì)算場(chǎng)景,例如使用Spark SQL讀入的數(shù)據(jù)可以作為機(jī)器學(xué)習(xí)庫(kù)MLlib的輸入。表1列舉了一些在Spark平臺(tái)上的計(jì)算場(chǎng)景。表1 Spark的應(yīng)用場(chǎng)景舉例在本文寫(xiě)作是,Spark的最新版本為1.2.0,文中的示例代碼也來(lái)自于這個(gè)版本。2、Spark內(nèi)核介紹相信大數(shù)據(jù)工程師都非常了解Hadoop MapReduce一個(gè)最大的問(wèn)題是在很多應(yīng)用場(chǎng)景中速度非常慢,只適合離線的計(jì)算任務(wù)。這是由于MapReduce需要將任務(wù)劃分成map和reduce兩個(gè)階段,map階段產(chǎn)生的中間結(jié)果要寫(xiě)回磁盤,而在這兩個(gè)階段之間需要進(jìn)行shuffle操作。Shuffle操作需要從網(wǎng)絡(luò)中的各個(gè)節(jié)點(diǎn)進(jìn)行數(shù)據(jù)拷貝,使其往往成為最為耗時(shí)的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時(shí)間耗費(fèi)在網(wǎng)絡(luò)磁盤IO中而不是用于計(jì)算。在一些特定的計(jì)算場(chǎng)景中,例如像邏輯回歸這樣的迭代式的計(jì)算,MapReduce的弊端會(huì)顯得更加明顯。那Spark是如果設(shè)計(jì)分布式計(jì)算的呢?首先我們需要理解Spark中最重要的概念-彈性分布數(shù)據(jù)集(Resilient Distributed Dataset),也就是RDD。2.1 彈性分布數(shù)據(jù)集RDDRDD是Spark中對(duì)數(shù)據(jù)和計(jì)算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的并能夠被并行操作的數(shù)據(jù)集合。對(duì)RDD的操作分為兩種transformation和action。Transformation操作是通過(guò)轉(zhuǎn)換從一個(gè)或多個(gè)RDD生成新的RDD。Action操作是從RDD生成最后的計(jì)算結(jié)果。在Spark最新的版本中,提供豐富的transformation和action操作,比起MapReduce計(jì)算模型中僅有的兩種操作,會(huì)大大簡(jiǎn)化程序開(kāi)發(fā)的難度。RDD的生成方式只有兩種,一是從數(shù)據(jù)源讀入,另一種就是從其它RDD通過(guò)transformation操作轉(zhuǎn)換。一個(gè)典型的Spark程序就是通過(guò)Spark上下文環(huán)境(SparkContext)生成一個(gè)或多個(gè)RDD,在這些RDD上通過(guò)一系列的transformation操作生成最終的RDD,最后通過(guò)調(diào)用最終RDD的action方法輸出結(jié)果。每個(gè)RDD都可以用下面5個(gè)特性來(lái)表示,其中后兩個(gè)為可選的: 分片列表(數(shù)據(jù)塊列表) 計(jì)算每個(gè)分片的函數(shù) 對(duì)父RDD的依賴列表 對(duì)key-value類型的RDD的分片器(Partitioner)(可選) 每個(gè)數(shù)據(jù)分片的預(yù)定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)(可選)雖然Spark是基于內(nèi)存的計(jì)算,但RDD不光可以存儲(chǔ)在內(nèi)存中,根據(jù)useDisk、useMemory、useOffHeap, deserialized、replication五個(gè)參數(shù)的組合Spark提供了12種存儲(chǔ)級(jí)別,在后面介紹RDD的容錯(cuò)機(jī)制時(shí),我們會(huì)進(jìn)一步理解。值得注意的是當(dāng)StorageLevel設(shè)置成OFF_HEAP時(shí),RDD實(shí)際被保存到Tachyon中。Tachyon是一個(gè)基于內(nèi)存的分布式文件系統(tǒng),目前正在快速發(fā)展,本文不做詳細(xì)介紹,可以通過(guò)其官方網(wǎng)站進(jìn)一步了解。1. class StorageLevel private(2. private var _useDisk: Boolean,3. private var _useMemory: Boolean,4. private var _useOffHeap: Boolean,5. private var _deserialized: Boolean6. private var _replication: Int = 1)7. extends Externalizable / 8. 9. val NONE = new StorageLevel(false, false, false, false)10. val DISK_ONLY = new StorageLevel(true, false, false, false)11. val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)12. val MEMORY_ONLY = new StorageLevel(false, true, false, true)13. val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)14. val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)15. val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)16. val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)17. val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)18. val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)19. val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)20. val OFF_HEAP = new StorageLevel(false, false, true, false)2.2 DAG、Stage與任務(wù)的生成Spark的計(jì)算發(fā)生在RDD的action操作,而對(duì)action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會(huì)觸發(fā)真正的計(jì)算。Spark內(nèi)核會(huì)在需要計(jì)算發(fā)生的時(shí)刻繪制一張關(guān)于計(jì)算路徑的有向無(wú)環(huán)圖,也就是DAG。舉個(gè)例子,在圖2中,從輸入中邏輯上生成A和C兩個(gè)RDD,經(jīng)過(guò)一系列transformation操作,邏輯上生成了F,注意,我們說(shuō)的是邏輯上,因?yàn)檫@時(shí)候計(jì)算沒(méi)有發(fā)生,Spark內(nèi)核做的事情只是記錄了RDD的生成和依賴關(guān)系。當(dāng)F要進(jìn)行輸出時(shí),也就是F進(jìn)行了action操作,Spark會(huì)根據(jù)RDD的依賴生成DAG,并從起點(diǎn)開(kāi)始真正的計(jì)算。圖2 邏輯上的計(jì)算過(guò)程:DAG有了計(jì)算的DAG圖,Spark內(nèi)核下一步的任務(wù)就是根據(jù)DAG圖將計(jì)算劃分成任務(wù)集,也就是Stage,這樣可以將任務(wù)提交到計(jì)算節(jié)點(diǎn)進(jìn)行真正的計(jì)算。Spark計(jì)算的中間結(jié)果默認(rèn)是保存在內(nèi)存中的,Spark在劃分Stage的時(shí)候會(huì)充分考慮在分布式計(jì)算中可流水線計(jì)算(pipeline)的部分來(lái)提高計(jì)算的效率,而在這個(gè)過(guò)程中,主要的根據(jù)就是RDD的依賴類型。根據(jù)不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個(gè)partition只依賴于父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個(gè)partition都依賴于父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有g(shù)roupByKey, sortByKey等??梢钥吹?,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對(duì)于窄依賴,Spark會(huì)將其盡量劃分在同一個(gè)stage中,因?yàn)樗鼈兛梢赃M(jìn)行流水線計(jì)算。圖3 RDD的寬依賴和窄依賴我們?cè)偻ㄟ^(guò)圖4詳細(xì)解釋一下Spark中的Stage劃分。我們從HDFS中讀入數(shù)據(jù)生成3個(gè)不同的RDD,通過(guò)一系列transformation操作后再將計(jì)算結(jié)果保存回HDFS??梢钥吹竭@幅DAG中只有join操作是一個(gè)寬依賴,Spark內(nèi)核會(huì)以此為邊界將其前后劃分成不同的Stage. 同時(shí)我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個(gè)流水線操作,通過(guò)map操作生成的partition可以不用等待整個(gè)RDD計(jì)算結(jié)束,而是繼續(xù)進(jìn)行union操作,這樣大大提高了計(jì)算的效率。圖4 Spark中的Stage劃分Spark在運(yùn)行時(shí)會(huì)把Stage包裝成任務(wù)提交,有父Stage的Spark會(huì)先提交父Stage。弄清楚了Spark劃分計(jì)算的原理,我們?cè)俳Y(jié)合源碼看一看這其中的過(guò)程。下面的代碼是DAGScheduler中的得到一個(gè)RDD父Stage的函數(shù),可以看到寬依賴為劃分Stage的邊界。1. /*2. * Get or create the list of parent stages for a given RDD. The stages will be assigned the3. * provided jobId if they havent already been created with a lower jobId.4. */5. 6. private def getParentStages(rdd: RDD_, jobId: Int): ListStage = 7. val parents = new HashSetStage8. val visited = new HashSetRDD_9. / We are manually maintaining a stack here to prevent StackOverflowError10. / caused by recursively visiting11. val waitingForVisit = new StackRDD_12. def visit(r: RDD_) 13. if (!visited(r) 14. visited += r15. / Kind of ugly: need to register RDDs with the cache here since16. / we cant do it in its constructor because # of partitions is unknown17. for (dep 20. parents += getShuffleMapStage(shufDep, jobId)21. case _ =22. waitingForVisit.push(dep.rdd)23. 24. 25. 26. 27. 28. waitingForVisit.push(rdd)29. while (!waitingForVisit.isEmpty) 30. visit(waitingForVisit.pop()31. 32. parents.toList33. 上面提到Spark的計(jì)算是從RDD調(diào)用action操作時(shí)候觸發(fā)的,我們來(lái)看一個(gè)action的代碼RDD的collect方法是一個(gè)action操作,作用是將RDD中的數(shù)據(jù)返回到一個(gè)數(shù)組中。可以看到,在此action中,會(huì)觸發(fā)Spark上下文環(huán)境SparkContext中的runJob方法,這是一系列計(jì)算的起點(diǎn)。1. abstract class RDDT: ClassTag(2. transient private var sc: SparkContext,3. transient private var deps: SeqDependency_4. ) extends Serializable with Logging 5. /.6. /*7. * Return an array that contains all of the elements in this RDD.8. */9. def collect(): ArrayT = 10. val results = sc.runJob(this, (iter: IteratorT) = iter.toArray)11. Array.concat(results: _*)12. 13. SparkContext擁有DAGScheduler的實(shí)例,在runJob方法中會(huì)進(jìn)一步調(diào)用DAGScheduler的runJob方法。在此時(shí),DAGScheduler會(huì)生成DAG和Stage,將Stage提交給TaskScheduler。TaskSchduler將Stage包裝成TaskSet,發(fā)送到Worker節(jié)點(diǎn)進(jìn)行真正的計(jì)算,同時(shí)還要監(jiān)測(cè)任務(wù)狀態(tài),重試失敗和長(zhǎng)時(shí)間無(wú)返回的任務(wù)。整個(gè)過(guò)程如圖5所示。圖5 Spark中任務(wù)的生成2.3 RDD的緩存與容錯(cuò)上文提到,Spark的計(jì)算是從action開(kāi)始觸發(fā)的,如果在action操作之前邏輯上很多transformation操作,一旦中間發(fā)生計(jì)算失敗,Spark會(huì)重新提交任務(wù),這在很多場(chǎng)景中代價(jià)過(guò)大。還有一些場(chǎng)景,如有些迭代算法,計(jì)算的中間結(jié)果會(huì)被重復(fù)使用,重復(fù)計(jì)算同樣增加計(jì)算時(shí)間和造成資源浪費(fèi)。因此,在提高計(jì)算效率和更好支持容錯(cuò),Spark提供了基于RDDcache機(jī)制和checkpoint機(jī)制。我們可以通過(guò)RDD的toDebugString來(lái)查看其遞歸的依賴信息,圖6展示了在spark shell中通過(guò)調(diào)用這個(gè)函數(shù)來(lái)查看wordCount RDD的依賴關(guān)系,也就是它的Lineage.圖6 RDD wordCount的lineage如果發(fā)現(xiàn)Lineage過(guò)長(zhǎng)或者里面有被多次重復(fù)使用的RDD,我們就可以考慮使用cache機(jī)制或checkpoint機(jī)制了。我們可以通過(guò)在程序中直接調(diào)用RDD的cache方法將其保存在內(nèi)存中,這樣這個(gè)RDD就可以被多個(gè)任務(wù)共享,避免重復(fù)計(jì)算。另外,RDD還提供了更為靈活的persist方法,可以指定存儲(chǔ)級(jí)別。從源碼中可以看到RDD.cache就是簡(jiǎn)單的調(diào)用了RDD.persist(StorageLevel.MEMORY_ONLY)。1. /* Persist this RDD with the default storage level (MEMORY_ONLY). */2. def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)3. def cache(): this.type = persist()同樣,我們可以調(diào)用RDD的checkpoint方法將其保存到磁盤。我們需要在SparkContext中設(shè)置checkpoint的目錄,否則調(diào)用會(huì)拋出異常。值得注意的是,在調(diào)用checkpoint之前建議先調(diào)用cache方法將RDD放入內(nèi)存,否則將RDD保存到文件的時(shí)候需要重新計(jì)算。1. /*2. * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint3. * directory set with SparkContext.setCheckpointDir() and all references to its parent4. * RDDs will be removed. This function must be called before any job has been5. * executed on this RDD. It is strongly recommended that this RDD is persisted in6. * memory, otherwise saving it on a file will require recomputation.7. */8. def checkpoint() 9. if (context.checkpointDir.isEmpty) 10. throw new SparkException(Checkpoint directory has not been set in the SparkContext)11. else if (checkpointData.isEmpty) 12. checkpointData = Some(new RDDCheckpointData(this)13. checkpointData.get.markForCheckpoint()14. 15. Cache機(jī)制和checkpoint機(jī)制的差別在于cache將RDD保存到內(nèi)存,并保留Lineage,如果緩存失效RDD還可以通過(guò)Lineage重建。而checkpoint將RDD落地到磁盤并切斷Lineage,由文件系統(tǒng)保證其重建。2.4 Spark任務(wù)的部署Spark的集群部署分為Standalone、Mesos和Yarn三種模式,我們以Sta

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論