完成第二部分spark速成spark2.1.0版本5.33轉(zhuǎn)換操作_第1頁
完成第二部分spark速成spark2.1.0版本5.33轉(zhuǎn)換操作_第2頁
完成第二部分spark速成spark2.1.0版本5.33轉(zhuǎn)換操作_第3頁
完成第二部分spark速成spark2.1.0版本5.33轉(zhuǎn)換操作_第4頁
完成第二部分spark速成spark2.1.0版本5.33轉(zhuǎn)換操作_第5頁
已閱讀5頁,還剩11頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、Spark2.1.0入門:DStream轉(zhuǎn)換操作林子雨老師2017年3月4日482【版權(quán)聲明】博客內(nèi)容由廈門大學(xué)數(shù)據(jù)庫實(shí)驗(yàn)室擁有版權(quán),未經(jīng)允許,請勿轉(zhuǎn)載! HYPERLINK /blog/spark/ t /blog/1390-2/_blank 返回Spark教程首頁DStream轉(zhuǎn)換操作包括無狀態(tài)轉(zhuǎn)換和有狀態(tài)轉(zhuǎn)換。無狀態(tài)轉(zhuǎn)換:每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)換:當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于滑動窗口的轉(zhuǎn)換和追蹤狀態(tài)變化的轉(zhuǎn)換(updateStateByKey)。DStream無狀態(tài)轉(zhuǎn)換操作下面給出一些無狀態(tài)轉(zhuǎn)換操作的含義:* map(fun

2、c) :對源DStream的每個(gè)元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換,得到一個(gè)新的DStream;* flatMap(func): 與map相似,但是每個(gè)輸入項(xiàng)可用被映射為0個(gè)或者多個(gè)輸出項(xiàng);* filter(func): 返回一個(gè)新的DStream,僅包含源DStream中滿足函數(shù)func的項(xiàng);* repartition(numPartitions): 通過創(chuàng)建更多或者更少的分區(qū)改變DStream的并行程度;* union(otherStream): 返回一個(gè)新的DStream,包含源DStream和其他DStream的元素;* count():統(tǒng)計(jì)源DStream中每個(gè)RDD的元素?cái)?shù)量;* re

3、duce(func):利用函數(shù)func聚集源DStream中每個(gè)RDD的元素,返回一個(gè)包含單元素RDDs的新DStream;* countByValue():應(yīng)用于元素類型為K的DStream上,返回一個(gè)(K,V)鍵值對類型的新DStream,每個(gè)鍵的值是在原DStream的每個(gè)RDD中的出現(xiàn)次數(shù);* reduceByKey(func, numTasks):當(dāng)在一個(gè)由(K,V)鍵值對組成的DStream上執(zhí)行該操作時(shí),返回一個(gè)新的由(K,V)鍵值對組成的DStream,每一個(gè)key的值均由給定的recuce函數(shù)(func)聚集起來;* join(otherStream, numTasks):當(dāng)

4、應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)鍵值對,一個(gè)包含(K,W)鍵值對),返回一個(gè)包含(K, (V, W)鍵值對的新DStream;* cogroup(otherStream, numTasks):當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)鍵值對,一個(gè)包含(K,W)鍵值對),返回一個(gè)包含(K, SeqV, SeqW)的元組;* transform(func):通過對源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個(gè)新的DStream。支持在新的DStream中做任何RDD操作。無狀態(tài)轉(zhuǎn)換操作實(shí)例:我們之前“ HYPERLINK /blog/1387-2/ t /blo

5、g/1390-2/_blank 套接字流”部分介紹的詞頻統(tǒng)計(jì),就是采用無狀態(tài)轉(zhuǎn)換,每次統(tǒng)計(jì),都是只統(tǒng)計(jì)當(dāng)前批次到達(dá)的單詞的詞頻,和之前批次無關(guān),不會進(jìn)行累計(jì)。DStream有狀態(tài)轉(zhuǎn)換操作對于DStream有狀態(tài)轉(zhuǎn)換操作而言,當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于滑動窗口的轉(zhuǎn)換和追蹤狀態(tài)變化(updateStateByKey)的轉(zhuǎn)換?;瑒哟翱谵D(zhuǎn)換操作滑動窗口轉(zhuǎn)換操作的計(jì)算過程如下圖所示,我們可以事先設(shè)定一個(gè)滑動窗口的長度(也就是窗口的持續(xù)時(shí)間),并且設(shè)定滑動窗口的時(shí)間間隔(每隔多長時(shí)間執(zhí)行一次計(jì)算),然后,就可以讓窗口按照指定時(shí)間間隔在源DStream上滑動,每次窗

6、口停放的位置上,都會有一部分DStream被框入窗口內(nèi),形成一個(gè)小段的DStream,這時(shí),就可以啟動對這個(gè)小段DStream的計(jì)算。圖 滑動窗口的計(jì)算過程下面給給出一些窗口轉(zhuǎn)換操作的含義:* window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù),計(jì)算得到一個(gè)新的DStream;* countByWindow(windowLength, slideInterval) 返回流中元素的一個(gè)滑動窗口數(shù);* reduceByWindow(func, windowLength, slideInterval) 返回一個(gè)單元素流。利用函數(shù)func聚

7、集滑動時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)func必須滿足結(jié)合律,從而可以支持并行計(jì)算;* reduceByKeyAndWindow(func, windowLength, slideInterval, numTasks) 應(yīng)用到一個(gè)(K,V)鍵值對組成的DStream上時(shí),會返回一個(gè)由(K,V)鍵值對組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)(func函數(shù))進(jìn)行聚合計(jì)算。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。可以通過numTasks參數(shù)的設(shè)置來指定不同的任務(wù)數(shù);* reduceByKeyAndWindow(func, invFunc

8、, windowLength, slideInterval, numTasks) 更加高效的reduceByKeyAndWindow,每個(gè)窗口的reduce值,是基于先前窗口的reduce值進(jìn)行增量計(jì)算得到的;它會對進(jìn)入滑動窗口的新數(shù)據(jù)進(jìn)行reduce操作,并對離開窗口的老數(shù)據(jù)進(jìn)行“逆向reduce”操作。但是,只能用于“可逆reduce函數(shù)”,即那些reduce函數(shù)都有一個(gè)對應(yīng)的“逆向reduce函數(shù)”(以InvFunc參數(shù)傳入);* countByValueAndWindow(windowLength, slideInterval, numTasks) 當(dāng)應(yīng)用到一個(gè)(K,V)鍵值對組成的D

9、Stream上,返回一個(gè)由(K,V)鍵值對組成的新的DStream。每個(gè)key的值都是它們在滑動窗口中出現(xiàn)的頻率。窗口轉(zhuǎn)換操作實(shí)例:在上一節(jié)的“ HYPERLINK /blog/1098-2/ t /blog/1390-2/_blank Apache Kafka作為DStream數(shù)據(jù)源”內(nèi)容中,在我們已經(jīng)使用了窗口轉(zhuǎn)換操作,也就是,在KafkaWordCount.scala代碼中,你可以找到下面這一行:val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)這行代碼中就是一個(gè)窗口轉(zhuǎn)換操作re

10、duceByKeyAndWindow,其中,Minutes(2)是滑動窗口長度,Seconds(10)是滑動窗口時(shí)間間隔(每隔多長時(shí)間滑動一次窗口)。reduceByKeyAndWindow中就使用了加法和減法這兩個(gè)reduce函數(shù),加法和減法這兩種reduce函數(shù)都是“可逆的reduce函數(shù)”,也就是說,當(dāng)滑動窗口到達(dá)一個(gè)新的位置時(shí),原來之前被窗口框住的部分?jǐn)?shù)據(jù)離開了窗口,又有新的數(shù)據(jù)被窗口框住,但是,這時(shí)計(jì)算窗口內(nèi)單詞的詞頻時(shí),不需要對當(dāng)前窗口內(nèi)的所有單詞全部重新執(zhí)行統(tǒng)計(jì),而是只要把窗口內(nèi)新增進(jìn)來的元素,增量加入到統(tǒng)計(jì)結(jié)果中,把離開窗口的元素從統(tǒng)計(jì)結(jié)果中減去,這樣,就大大提高了統(tǒng)計(jì)的效率。

11、尤其對于窗口長度較大時(shí),這種“逆函數(shù)”帶來的效率的提高是很明顯的。updateStateByKey操作當(dāng)我們需要在跨批次之間維護(hù)狀態(tài)時(shí),就必須使用updateStateByKey操作。下面我們就給出一個(gè)具體實(shí)例。我們還是以前面在“ HYPERLINK /blog/1083-2/ t /blog/1390-2/_blank 套接字流”部分講過的NetworkWordCount為例子來介紹,在之前的套接字流的介紹中,我們統(tǒng)計(jì)單詞詞頻采用的是無狀態(tài)轉(zhuǎn)換操作,也就是說,每個(gè)批次的單詞發(fā)送給NetworkWordCount程序處理時(shí),NetworkWordCount只對本批次內(nèi)的單詞進(jìn)行詞頻統(tǒng)計(jì),不會考

12、慮之前到達(dá)的批次的單詞,所以,不同批次的單詞詞頻都是獨(dú)立統(tǒng)計(jì)的。對于有狀態(tài)轉(zhuǎn)換操作而言,本批次的詞頻統(tǒng)計(jì),會在之前批次的詞頻統(tǒng)計(jì)結(jié)果的基礎(chǔ)上進(jìn)行不斷累加,所以,最終統(tǒng)計(jì)得到的詞頻,是所有批次的單詞的總的詞頻統(tǒng)計(jì)結(jié)果。下面,我們來改造一下在套接字流介紹過的NetworkWordCount程序。請登錄Linux系統(tǒng),打開一個(gè)終端,然后,執(zhí)行下面命令:cd /usr/local/spark/mycode/streaming /這個(gè)streaming目錄是之前已經(jīng)創(chuàng)建好的mkdir statefulcd statefulmkdir -p src/main/scalacd src/main/scalav

13、im NetworkWordCountStateful.scalaShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 上面使用vim編輯器新建了一個(gè)NetworkWordCountStateful.scala代碼文件,請?jiān)诶锩孑斎胍韵麓a:package org.apache.spark.examples.streamingimport org.apache.spark._import org.apache.spark.st

14、reaming._import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful def main(args: ArrayString) /定義狀態(tài)更新函數(shù) val updateFunc = (values: SeqInt, state: OptionInt) = val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) Stre

15、amingExamples.setStreamingLogLevels() /設(shè)置log4j日志級別 val conf = new SparkConf().setMaster(local2).setAppName(NetworkWordCountStateful) val sc = new StreamingContext(conf, Seconds(5) sc.checkpoint(file:/usr/local/spark/mycode/streaming/stateful/) /設(shè)置檢查點(diǎn),檢查點(diǎn)具有容錯機(jī)制 val lines = sc.socketTextStream(localho

16、st, 9999) val words = lines.flatMap(_.split( ) val wordDstream = words.map(x = (x, 1) val stateDstream = wordDstream.updateStateByKeyInt(updateFunc) stateDstream.print() sc.start() sc.awaitTermination() 保存該文件退出vim編輯器。這里要對這段代碼中新增的updataStateByKey稍微解釋一下。Spark Streaming的updateStateByKey可以把DStream中的數(shù)據(jù)按k

17、ey做reduce操作,然后對各個(gè)批次的數(shù)據(jù)進(jìn)行累加。注意,wordDstream.updateStateByKey HYPERLINK /blog/1390-2/updateFunc t /blog/1390-2/_blank Int每次傳遞給updateFunc函數(shù)兩個(gè)參數(shù),其中,第一個(gè)參數(shù)是某個(gè)key(即某個(gè)單詞)的當(dāng)前批次的一系列值的列表(SeqInt形式),updateFunc函數(shù)中 val currentCount = values.foldLeft(0)(_ + _)的作用(請參考之前章節(jié)“ HYPERLINK /blog/1122-2/ t /blog/1390-2/_blan

18、k fold操作”的介紹),就是計(jì)算這個(gè)被傳遞進(jìn)來的與某個(gè)key對應(yīng)的當(dāng)前批次的所有值的總和,也就是當(dāng)前批次某個(gè)單詞的出現(xiàn)次數(shù),保存在變量currentCount中。傳遞給updateFunc函數(shù)的第二個(gè)參數(shù)是某個(gè)key的歷史狀態(tài)信息,也就是某個(gè)單詞歷史批次的詞頻匯總結(jié)果。實(shí)際上,某個(gè)單詞的歷史詞頻應(yīng)該是一個(gè)Int類型,這里為什么要采用OptionInt呢?OptionInt是類型 Int的容器(請參考之前章節(jié)“ HYPERLINK /blog/956-2/ t /blog/1390-2/_blank 模式匹配”了解Option類的使用方法),更確切地說,你可以把它看作是某種集合,這個(gè)特殊的集

19、合要么只包含一個(gè)元素(即單詞的歷史詞頻),要么就什么元素都沒有(這個(gè)單詞歷史上沒有出現(xiàn)過,所以沒有歷史詞頻信息)。之所以采用 OptionInt保存歷史詞頻信息,這是因?yàn)?,歷史詞頻可能不存在,很多時(shí)候,在值不存在時(shí),需要進(jìn)行回退,或者提供一個(gè)默認(rèn)值,Scala 為Option類型提供了getOrElse方法,以應(yīng)對這種情況。 state.getOrElse(0)的含義是,如果該單詞沒有歷史詞頻統(tǒng)計(jì)匯總結(jié)果,那么,就取值為0,如果有歷史詞頻統(tǒng)計(jì)結(jié)果,就取歷史結(jié)果,然后賦值給變量previousCount。最后,當(dāng)前值和歷史值進(jìn)行求和,并包裝在Some中返回。然后,再次使用vim編輯器新建一個(gè)St

20、reamingExamples.scala文件,用于設(shè)置log4j日志級別,代碼如下:package org.apache.spark.examples.streamingimport org.apache.spark.Loggingimport org.apache.log4j.Level, Logger/* Utility functions for Spark Streaming examples. */object StreamingExamples extends Logging /* Set reasonable logging levels for streaming if th

21、e user has not configured log4j. */ def setStreamingLogLevels() val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) / We first log something to initialize Sparks default logging, then we override the / logging level. logInfo(Setting log level to WARN fo

22、r streaming example. + To override add a custom perties to the classpath.) Logger.getRootLogger.setLevel(Level.WARN) 退出vim編輯器。下面要對代碼進(jìn)行sbt打包編譯。這里需要一個(gè)simple.sbt文件,使用vim編輯器創(chuàng)建一個(gè):cd /usr/local/spark/mycode/streaming/statefulvim simple.sbtShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLIN

23、K /blog/1390-2/javascript:void(0); o 查看純文本代碼 在simple.sbt中輸入以下內(nèi)容:name := Simple Projectversion := 1.0scalaVersion := 2.11.8libraryDependencies += org.apache.spark % spark-streaming % 2.1.0創(chuàng)建好simple.sbt文件后,退出vim編輯器。然后,執(zhí)行下面命令:cd /usr/local/spark/mycode/streaming/statefulfind .Shell 命令 HYPERLINK /blog/1

24、390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 屏幕上返回的信息,應(yīng)該是類似下面的文件結(jié)構(gòu):./src./src/main./src/main/scala./src/main/scala/NetworkWordCountStateful.scala./src/main/scala/StreamingExamples.scala./simple.sbt然后,就可以執(zhí)行sbt打包編譯了,命令如下:cd /usr/local/spark/mycode/streaming/sta

25、teful/usr/local/sbt/sbt packageShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 (備注:根據(jù)筆者實(shí)際測試,在NetworkWordCountStateful.scala代碼中使用 StreamingExamples.setStreamingLogLevels() ,還不能保證屏幕上輸出我們預(yù)期的結(jié)果,同時(shí)還需要把/usr/local/spark/conf/perties設(shè)置為log4j.rootCategory=WARN, console,這樣才可以讓屏幕上顯示我們預(yù)期的結(jié)果)下面可以再設(shè)置一下log4j格式,請?jiān)诮K端內(nèi)輸入如下命令:cd /usr/local/spark/confvim perties #如果不存在,就從perties.template拷貝一份得到pertiesShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論