《云計(jì)算》-Spark的實(shí)踐課件_第1頁(yè)
《云計(jì)算》-Spark的實(shí)踐課件_第2頁(yè)
《云計(jì)算》-Spark的實(shí)踐課件_第3頁(yè)
《云計(jì)算》-Spark的實(shí)踐課件_第4頁(yè)
《云計(jì)算》-Spark的實(shí)踐課件_第5頁(yè)
已閱讀5頁(yè),還剩29頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、云計(jì)算Spark的實(shí)踐目標(biāo)2:利用Spark處理數(shù)據(jù)理解Spark原理開(kāi)發(fā)Spark程序:開(kāi)發(fā)環(huán)境、程序提交、運(yùn)行模式內(nèi)核講解:RDD工作機(jī)制:任務(wù)調(diào)度、資源分配使用SparkSpark讀取、存儲(chǔ)HDFS、MongoDBSpark StreamingSpark GraphXSpark MLlibSpark實(shí)踐(1)數(shù)據(jù)讀取、存儲(chǔ)從數(shù)據(jù)源到RDDparallelize()textFile(path)hadoopFile(path)sequenceFile(path)objectFile(path)binaryFiles(path)從RDD目標(biāo)數(shù)據(jù)saveAsTextFile(path)saveA

2、sSequenceFile(path)saveAsObjectFile(path)saveAsHadoopFile(path)讀取、存儲(chǔ)MongoDB例子核心部分:import com.mongodb.hadoop.MongoOutputFormat;Configuration config = new Configuration();config.set(mongo.input.uri, mongodb:/24:27017/ligf.student);config.set(mongo.output.uri, mongodb:/24:27017/ligf.test);JavaPairRDD m

3、ongoRDD = sc.newAPIHadoopRDD(config, com.mongodb.hadoop.MongoInputFormat.class, Object.class, BSONObject.class);rdd.saveAsNewAPIHadoopFile(“file:/bogus”, classOfAny, classOfAny, classOfcom.mongodb.hadoop .MongoOutputFormatAny, Any, config)問(wèn)題1二次排序指在歸約(reduce)階段對(duì)某個(gè)鍵關(guān)聯(lián)的值排序Map階段可以對(duì)對(duì)按照鍵的值進(jìn)行排序,但是歸約器不會(huì)自動(dòng)對(duì)鍵

4、值對(duì)按照值排序但是有時(shí)候需要:將成績(jī)按照班級(jí)歸約后排序;店鋪產(chǎn)品銷(xiāo)量排序等例1. 數(shù)據(jù)如下,按照name和time對(duì)value排序問(wèn)題1二次排序方案1在內(nèi)存中實(shí)現(xiàn)排序,只借助map-reduce框架進(jìn)行分組組內(nèi)直接在內(nèi)存中調(diào)用排序函數(shù)步驟:創(chuàng)建SparkContext對(duì)象;連接到Spark master;讀取原始數(shù)據(jù);構(gòu)建對(duì)按照鍵分組:groupByKey對(duì)每個(gè)組對(duì)應(yīng)的新的value(是一個(gè)列表)進(jìn)行排序操作缺點(diǎn)不具備伸縮性,單個(gè)服務(wù)器的內(nèi)存又成為瓶頸;(time, value)問(wèn)題1二次排序-方案1問(wèn)題1二次排序-方案1方案二:利用框架問(wèn)題1二次排序方案2利用框架實(shí)現(xiàn)值排序自定義Key +

5、sortByKey():組合鍵/wuxintdrh/article/details/72809156 步驟自定義組合鍵:, value調(diào)用sortByKey時(shí)會(huì)用排序,因此要比較大小在自定義Key中定義好compare方法按照自定義Key格式實(shí)現(xiàn)mapToPair最后調(diào)用sortByKey()問(wèn)題1二次排序方案2使用組合鍵 + groupByKey()/2015/12/a-scalable-groupbykey-and-secondary-sort-for-java-spark/ 問(wèn)題2Top N列表對(duì)給定的一組返回V的Top N對(duì)應(yīng)的K列表銷(xiāo)售量Top N的產(chǎn)品、訪(fǎng)問(wèn)量Top N的頁(yè)面、好友

6、數(shù)Top N的用戶(hù)等K唯一:產(chǎn)品的銷(xiāo)售量已經(jīng)得到規(guī)約K不唯一:只是在不同服務(wù)器上統(tǒng)計(jì)了對(duì)應(yīng)的產(chǎn)品銷(xiāo)售量基本思路在每一個(gè)Partition內(nèi)取本地的Top N;將所有本地Top N合并,再取全局Top N使用mapPartitions()對(duì)RDD的每一個(gè)Partition進(jìn)行操作輸入是整個(gè)Partition的數(shù)據(jù),每一個(gè)Task對(duì)應(yīng)處理一個(gè)Partition結(jié)果RDD與輸入RDD有相同個(gè)數(shù)的Partition結(jié)果RDD的每一個(gè)Partition就是局部Top N對(duì)保存所有局部Top N的RDD進(jìn)行action操作用collect方法將局部Top N存放到list中遍歷list,選出全局top N

7、或者使用框架的reduce操作定義兩兩合并的規(guī)則問(wèn)題2Top N列表問(wèn)題2Top N列表問(wèn)題2Top N列表使用框架的takeOrdered(N, DefineComparator)支持自定義Comparator問(wèn)題2Top N列表使用框架的takeOrdered(N, DefineComparator)得到JavaPairRDD后,直接使用takeOrdered獲得全局top N需要自定義comparatorSpark實(shí)踐(2)Spark Streaming(1)實(shí)時(shí)對(duì)大量數(shù)據(jù)進(jìn)行快速處理:處理周期短;連續(xù)不斷地計(jì)算計(jì)算基本過(guò)程數(shù)據(jù)分批Spark實(shí)踐(2)Spark Streaming(2)

8、StreamingContext (類(lèi)似SparkContext)Dstream (類(lèi)似RDD,是一個(gè)RDD序列)內(nèi)部是通過(guò)RDD實(shí)現(xiàn)的每個(gè)時(shí)間點(diǎn)對(duì)應(yīng)一個(gè)RDD程序執(zhí)行過(guò)程初始化StreamingContext創(chuàng)建Dstream對(duì)DStream的操作Spark實(shí)踐(2)Spark Streaming(3)Transformation操作參考RDDtransform操作直接操作內(nèi)部RDD;對(duì)沒(méi)有提供的transformation操作使用transform并直接調(diào)用內(nèi)部RDD的操作val joinedDStream = dstream.transform(rdd = rdd.join(rdd2)窗

9、口(Window)操作:合并幾個(gè)時(shí)間點(diǎn)的RDDwindowLength等生成帶窗口的DstreamcountByWindow、reduceByWindow等帶窗口參數(shù)的專(zhuān)用方法Output操作print()saveAsTextFiles/saveAsObjectFiles()foreachRDD(func): func一般是對(duì)RDD的Action操作問(wèn)題3訪(fǎng)問(wèn)的實(shí)時(shí)統(tǒng)計(jì)真實(shí)場(chǎng)景下的解決方案Nginx + Flume + Kafka +Spark StreamingNginx統(tǒng)計(jì)的目標(biāo)服務(wù)器,產(chǎn)生訪(fǎng)問(wèn)日志Flume實(shí)時(shí)讀取服務(wù)器日志并發(fā)送到Kafka集群Kafka則負(fù)責(zé)連接Spark Strea

10、mingSpark Streaming實(shí)時(shí)消費(fèi)Kafka集群上的數(shù)據(jù)并作分析簡(jiǎn)化模擬生成Nginx訪(fǎng)問(wèn)日志,自動(dòng)上傳到HDFS;Spark Streaming監(jiān)控HDFSHDFS存放日志的目錄與Streaming監(jiān)控目錄要相同設(shè)置Streaming運(yùn)行時(shí)間間隔:每次處理新產(chǎn)生的數(shù)據(jù)可以實(shí)現(xiàn)歷史數(shù)據(jù)的累加:updateFunctiondef updateFunction(newValues: SegInt, runningCount:OptionInt):OptionInt = val preCount = runningCount.getOrElse(0) /獲取歷史累計(jì)值 val newCo

11、unt = newValues.sum /當(dāng)前周期新的值 Some(newCount + preCount) /再次累加val runningCounts = pairs.updateStateKyKeyInt (updateFunction _)問(wèn)題3訪(fǎng)問(wèn)的實(shí)時(shí)統(tǒng)計(jì)模擬生成日志通過(guò)shell腳本實(shí)現(xiàn)問(wèn)題3訪(fǎng)問(wèn)的實(shí)時(shí)統(tǒng)計(jì)Streaming程序Scala版更多的示例Spark實(shí)踐(3)Spark GraphX(1)圖計(jì)算以圖為數(shù)據(jù)結(jié)構(gòu)基礎(chǔ)的相關(guān)算法及應(yīng)用數(shù)據(jù)結(jié)構(gòu)圖G=V表示頂點(diǎn)集合;E表示邊集合兩個(gè)頂點(diǎn)u,v相連,表示為邊(u,v)無(wú)向邊;任意邊為無(wú)向邊則稱(chēng)為無(wú)向圖有向邊;每一條都為有向邊則稱(chēng)為

12、有向圖GraphX提供的API圖生成圖數(shù)據(jù)訪(fǎng)問(wèn)查詢(xún)頂點(diǎn)數(shù)、邊數(shù);計(jì)算某個(gè)點(diǎn)的入度、出度等。圖算法遍歷頂點(diǎn)、邊;計(jì)算連通性;計(jì)算最大子圖;計(jì)算最短路徑;圖合并等Spark實(shí)踐(3)Spark GraphX(2)GraphX的實(shí)現(xiàn)核心是Graph數(shù)據(jù)結(jié)構(gòu),表示有向多重圖兩個(gè)頂點(diǎn)間允許存在多條邊,表示不同含義Graph由頂點(diǎn)RDD和邊RDD組成Graph的分布式存儲(chǔ)方式Spark實(shí)踐(3)Spark GraphX(3)圖生成讀入存儲(chǔ)關(guān)系信息的文件,構(gòu)造EdgeRDD:eRDD = sc.textFile()從Edge RDD構(gòu)造Graph:graph = Grapth.fromEdges(eRDD)

13、基本接口獲取邊數(shù):numEdges;獲取節(jié)點(diǎn)數(shù):numVertices獲取入度、出度:inDegrees, outDegrees結(jié)構(gòu)操作:reverse,subgraph, mask 關(guān)聯(lián)類(lèi)操作:將一個(gè)圖和一個(gè)RDD通過(guò)頂點(diǎn)ID關(guān)聯(lián)起來(lái),使圖獲得RDD信息joinVertices: outerJoinVertices: 聚合類(lèi)操作分布式遍歷所有的邊,執(zhí)行自定義的sendMsg函數(shù);在節(jié)點(diǎn)上執(zhí)行mergeMsg函數(shù)/docs/latest/graphx-programming-guide.html#summary-list-of-operators 官網(wǎng)API說(shuō)明Spark實(shí)踐(3)Spark

14、GraphX(4)例子Spark實(shí)踐(3)Spark GraphX(5)例子Spark實(shí)踐(3)Spark GraphX(6)例子Spark實(shí)踐(3)Spark GraphX(7)關(guān)聯(lián)操作新建一個(gè)圖,用原圖中的節(jié)點(diǎn)初始化,做一個(gè)類(lèi)型轉(zhuǎn)換用原圖中的RDD與新圖作關(guān)聯(lián),將原RDD信息賦值給新圖聚合操作定義一個(gè)sendMessage,一個(gè)mergeMessageMapfunctionReducefunction val nodeWeightMapFuncEx = (ctx:EdgeContext VD, Long, (Long, Long) ) = ctx.sendToDst(ctx.attr, 0

15、L)ctx.sendToSrc(0L, ctx.attr)val nodeWeightReduceFuncEx = (e1: (Long, Long), e2: (Long, Long) = (e1._1 + e2._1, e1._2 + e2._2)val nodeWeights = graph.aggregateMessage(nodeWeightMapFuncEx, nodeWeightReduceFuncEx)Spark實(shí)踐(4)Spark MLlib(1)Spark為機(jī)器學(xué)習(xí)問(wèn)題開(kāi)發(fā)的庫(kù)分類(lèi)、回歸、聚類(lèi)和協(xié)同過(guò)濾等機(jī)器學(xué)習(xí)簡(jiǎn)介學(xué)習(xí)方法的類(lèi)別按輸入數(shù)據(jù)分監(jiān)督學(xué)習(xí)Decision tre

16、es, neural networks, nearest-neighbor algorithms,Bayesian learning, hidden Markov models, SVM分類(lèi)問(wèn)題輸出是非連續(xù)的有限集合回歸問(wèn)題輸出是連續(xù)的實(shí)數(shù)集合無(wú)監(jiān)督學(xué)習(xí)聚類(lèi)問(wèn)題基于元素的相似度計(jì)算規(guī)則半監(jiān)督學(xué)習(xí)強(qiáng)化學(xué)習(xí)Markov Decision Processes, temporal difference learning, Q-learning分類(lèi)問(wèn)題的例子判定一個(gè)給定案件的案由目標(biāo):離婚糾紛、產(chǎn)品質(zhì)量糾紛、債務(wù)糾紛、其它輸入:給定的案件情況描述人案件案由1案由2案由3知識(shí)機(jī)器Learner案件a-案由1案件b-案由2案件c-案由3案件d-案由2案件e-案由1案件f -案由3機(jī)器學(xué)習(xí)算法model:classifier訓(xùn)練集樣例/實(shí)例屬性+標(biāo)簽(類(lèi))案件g-案由1案件h-案由2案件I -案由3案件j -案由2案件k-案由1案件l -案由3測(cè)試集樣例/實(shí)例屬性+標(biāo)簽(類(lèi))案件m-案由1案件n-案由2案件o-案由3案件p-案由2案件q-案由1案件r -案由3驗(yàn)證集Validation setSpark

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論