第6章 Spark Streaming-實時計算框架_第1頁
第6章 Spark Streaming-實時計算框架_第2頁
第6章 Spark Streaming-實時計算框架_第3頁
第6章 Spark Streaming-實時計算框架_第4頁
第6章 Spark Streaming-實時計算框架_第5頁
已閱讀5頁,還剩47頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

SparkStreaming

——實時計算框架書籍是人類進步的階梯,數(shù)字時代的來臨,也催生出“書”的新形式,即電子書。同時,眾多售書的電商平臺也應運而生。電商平臺想要在激烈的競爭中脫穎而出,需要更著重于改善用戶體驗,并增加用戶的黏性,把更多更好的書推薦給讀者,擴大他們的知識視野。用戶無法找到適宜的書籍時往往會相信大眾的選擇,選擇購買熱度較高的書籍?;谶@種情況,電商平臺可以根據(jù)現(xiàn)有書籍的評分、銷量、用戶的評分次數(shù)等信息構建書籍熱度,將一些熱度較高的書推薦給用戶,進而改善用戶體驗,增加用戶黏性,激發(fā)用戶的購買欲。任務背景書籍熱度的計算可以根據(jù)下式進行,其中,u表示用戶的平均評分,x表示用戶的評分次數(shù),y表示書籍的平均評分,z表示書籍被評分的次數(shù)。目前已采集了某電商網(wǎng)站上用戶對書籍的評分數(shù)據(jù)文件BookRating.txt,數(shù)據(jù)字段說明如下表。其中Rating字段中評分范圍為1~5分。任務背景字段名稱說明UserID用戶IDBookID書籍IDRating用戶對書籍的評分實時計算書籍熱度后,可以將熱度最高的10本圖書的評分數(shù)據(jù)保存在Hive數(shù)據(jù)庫中,因此需要在Hive數(shù)據(jù)庫中設計一個表,用于保存熱度最高的10本圖書的評分數(shù)據(jù)。Spark會將DataFrame寫入Hive并根據(jù)DataFrame自動創(chuàng)建表。為模擬實時數(shù)據(jù)的流式計算,本章將使用SparkStreaming框架實現(xiàn)書籍評分實時計算分析。本章任務如下。首先介紹SparkStreaming基本概念及運行原理;再詳細介紹SparkStreaming框架的DStream編程模型及其基礎操作;最后結合書籍評分數(shù)據(jù)實例,使用SparkStreaming框架實現(xiàn)書籍熱度的實時計算。任務背景1掌握DStream基礎操作目錄初識SparkStreaming2實現(xiàn)書籍熱度實時計算3使用SparkStreaming實現(xiàn)書籍熱度實時計算,首先需要對SparkStreaming基本概念及運行原理有大致的了解。本節(jié)的任務如下,了解SparkStreaming基本概念及運行原理;學習SparkStreaming程序的簡單編寫及運行。任務描述SparkStreaming是Spark的子框架,是Spark生態(tài)圈中用于處理流式數(shù)據(jù)的分布式流式處理框架,具有可伸縮、高吞吐量、容錯能力強等特點。同時,SparkStreaming能夠和SparkSQL、SparkMLlib、SparkGraphX進行無縫集成,可以從Kafka、Flume、HDFS、Kinesis等數(shù)據(jù)源中獲取數(shù)據(jù),而且不僅可以通過調用map()、reduce()、join()等方法處理數(shù)據(jù),也可以使用機器學習算法、圖算法處理數(shù)據(jù)。如右圖,經(jīng)SparkStreaming處理后的最終結果可以保存在文件系統(tǒng)(如HDFS)、數(shù)據(jù)庫(如MySQL)中或使用儀表面板進行實時展示。了解SparkStreaming基本概念SparkStreaming的運行原理圖了解SparkStreaming運行原理使用SparkStreaming一般需要進行如下的操作。創(chuàng)建StreamingContext對象。創(chuàng)建InputDStream。操作Dstream。啟動SparkStreaming。初步使用SparkStreaming單詞實時計數(shù)從一臺服務器的8888端口上接受一行或者多行文本內(nèi)容,并對接收到的內(nèi)容以空格分割計算每個單詞出現(xiàn)的次數(shù)。初步使用SparkStreaming單詞實時計數(shù)初步使用SparkStreaming使用ssc.textFileStream()方法監(jiān)聽HDFS上的目錄/user/root/sparkStreaming/temp,一旦有新文件加入到/user/root/sparkStreaming/temp目錄下,SparkStreaming計算出該時間內(nèi)的單詞統(tǒng)計數(shù)。初步使用SparkStreamingSparkStreaming監(jiān)聽HDFS目錄示例運行結果初步使用SparkStreaming1掌握DStream基礎操作目錄初識SparkStreaming2實現(xiàn)書籍熱度實時計算3DStream是SparkStreaming中一個非常重要的概念。SparkStreaming讀取數(shù)據(jù)時會得到DStream編程模型,且DStream提供了一系列操作方法。本節(jié)的任務如下。了解DStream的基本概念。學習DStream的轉換操作、窗口操作以及輸出操作。任務描述DStream是SparkStreaming對內(nèi)部實時數(shù)據(jù)流的抽象描述,可將DStream理解為持續(xù)性的數(shù)據(jù)流。可以通過外部數(shù)據(jù)源獲取DStream;也可以通過DStream現(xiàn)有的高級操作(如轉換操作)獲得DStream。DStream代表著一系列的持續(xù)的RDD,DStream中的每個RDD都是按一小段時間分割開的RDD,如下圖。了解DStream編程模型對DStream的任何操作都會轉化成對底層RDDs的操作。以單詞計數(shù)為例,獲取文本數(shù)據(jù)形成文本的輸入數(shù)據(jù)流linesDStream,使用flatMap()方法進行扁平化操作并進行分割,得到每一個單詞,形成單詞的文本數(shù)據(jù)流wordsDStream。對DStream進行操作的方法根據(jù)操作的類型可以分成3類,即轉換操作、窗口操作和輸出操作。了解DStream編程模型使用DStream轉換操作DStream轉換操作常用的方法及說明方法描述map(func)對源DStream的每個元素應用func函數(shù)并返回一個新的DStreamflatMap(func)類似map操作,不同的是每個元素可以被映射成0個或者多個輸出元素filter(func)對源DStream中的每一個元素應用func函數(shù)進行計算,如果func函數(shù)返回結果為true,則保留該元素,否則丟棄該元素,返回一個新的DStreamunion(otherStream)合并兩個DStream,生成一個包含兩個DStream中所有元素的新的DStreamcount()統(tǒng)計DStream中每個RDD包含的元素的個數(shù),得到一個只有一個元素的RDD構成的DStreamreduce(func)對源DStream中的每個元素應用func函數(shù)進行聚合操作,返回一個內(nèi)部所包含的RDD只有一個元素的新DStream使用DStream轉換操作DStream轉換操作常用的方法及說明方法描述countByKey()計算DStream中每個RDD內(nèi)的元素出現(xiàn)的頻次,并返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現(xiàn)的頻次reduceByKey(func,[numTasks])以一個鍵值RDD為目標,K為鍵,V為值。當一個(K,V)鍵值對的DStream被調用時,返回(K,V)鍵值對的新DStream,其中每個鍵的值都使用聚合函數(shù)func匯總。配置numTasks可以設置不同的并行任務數(shù)join(otherStream,[numTasks])當調用的是(K,V1)和(K,V2)鍵值對的兩個DStream時,返回元素為(K,(V1,V2))鍵值對的一個新DStreamcogroup(otherStream,[numTasks])當被調用的兩個DStream分別含有(K,V1)和(K,V2)鍵值對時,返回一個元素為(K,Seq[V1],Seq[V2])的新的DStreamtransform(func)通過對源DStream的每個RDD應用func函數(shù)返回一個新的DStream,用于在DStream上進行RDD的任意操作大部分操作(如map,flatMap,filter等)與RDD的轉換操作類似。transform操作極大地豐富了DStream上能夠進行的操作內(nèi)容。使用transform操作后,除了可以使用DStream提供的一些轉換方法之外,還能夠直接調用任意RDD上的操作方法。使用DStream轉換操作使用transform將一行語句分割成單詞使用DStream轉換操作在slave18888端口上輸入“IamlearningSparkStreamingnow”語句。運行結果如下圖,該語句在5s內(nèi)被分割成單詞。使用DStream轉換操作窗口函數(shù),就是在DStream流上,以一個可配置的長度為窗口,以一個可配置的速率向前移動窗口,根據(jù)窗口函數(shù)的具體內(nèi)容,對窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作,每次掉落在窗口內(nèi)的RDD的數(shù)據(jù)會被聚合起來執(zhí)行計算操作,然后生成的RDD會作為WindowDStream的一個RDD。下圖表述的是滑動窗口長度為3秒,這三秒內(nèi)的3個RDD會被聚合起來進行處理,然后過了兩秒鐘,又會對最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動窗口計算。所以每個滑動窗口操作,都必須指定兩個參數(shù),窗口長度以及滑動間隔,而且這兩個參數(shù)值都必須是batch(批處理時間)間隔的整數(shù)倍。使用DStream窗口操作常用的窗口轉換操作方法如下表。這些操作都需要兩個參數(shù),windowLength(窗口長度)和slideInterval(時間間隔)。使用DStream窗口操作方法描述window(windowLength,slideInterval)返回一個基于源DStream的窗口批次計算后得到的新DStreamcountByWindow(windowLength,slideInterval)返回基于滑動窗口的DStream中的元素的數(shù)量reduceByWindow(func,windowLength,slideInterval)基于滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStreamreduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])基于滑動窗口對元素為(K,V)鍵值對的DStream中的值,按K使用func函數(shù)進行聚合操作,得到一個新的DStream(續(xù)表)使用DStream窗口操作方法描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])一個更高效的reduceByKeyAndWindow()的實現(xiàn)版本,其中每個窗口的統(tǒng)計量是使用前一個窗口的新數(shù)據(jù)和“反向減少”離開窗口的舊數(shù)據(jù)來實現(xiàn)的。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,可以將t+3秒時刻過去5秒的統(tǒng)計量加上[t+3秒,t+4秒]的統(tǒng)計量,再減去[t?2秒,t?1秒]的統(tǒng)計量,這種方法可以復用中間3秒的統(tǒng)計量,提高統(tǒng)計的效率countByValueAndWindow(windowLength,slideInterval,[numTasks])基于滑動窗口計算源DStream中每個RDD內(nèi)每個元素出現(xiàn)的頻次并返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數(shù)量可以通過一個可選參數(shù)進行配置以window()為例設置窗口長度為3s,滑動時間間隔為1s,截取源DStream中的元素形成新的DStream。使用DStream窗口操作在slave1上啟動監(jiān)聽,基本上每秒輸入一個字母,然后取出當前時刻3秒這個長度中的所有元素,打印出來如右圖,從圖中可以看到,到第4秒時已經(jīng)不到a了,再下一秒就看不到b了,這說明此時a和b已經(jīng)不在當前窗口中。使用DStream窗口操作reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])操作類似reduceByKey操作,只不過兩者操作的數(shù)據(jù)源不同,reduceByKeyAndWindow的數(shù)據(jù)源是基于該DStream的窗口長度中的所有數(shù)據(jù)。將當前長度為3的時間窗口中的所有數(shù)據(jù)元素根據(jù)key進行合并,統(tǒng)計當前3秒中內(nèi)不同單詞出現(xiàn)的次數(shù)。使用DStream窗口操作統(tǒng)計當前3秒內(nèi)不同單詞出現(xiàn)的次數(shù)使用DStream窗口操作統(tǒng)計當前3秒內(nèi)不同單詞出現(xiàn)的次數(shù),運行結果如下圖計數(shù)。使用DStream窗口操作使用DStream輸出操作DStream輸出操作常用的方法及說明方法描述print()在Driver中輸出DStream中數(shù)據(jù)的前10個元素saveAsTextFiles(prefix,[suffix])將DStream中的內(nèi)容以文本的形式保存為文本文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件再單獨保存為文件夾,文件夾以prefix_TIME_IN_MS[.suffix]的方式命名saveAsObjectFiles(prefix,[suffix])將DStream中的內(nèi)容按對象序列化,并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix_TIME_IN_MS[.suffix]的方式命名saveAsHadoopFiles(prefix,[suffix])將DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix_TIME_IN_MS[.suffix]的方式命名foreachRDD(func)基本的輸出操作,將func函數(shù)應用于DStream中的RDD上,輸出數(shù)據(jù)至外部系統(tǒng),如保存RDD到文件或網(wǎng)絡數(shù)據(jù)庫等saveAsTextFiles、saveAsObjectFiles和saveAsHadoopFiles操作可以將DStream中的內(nèi)容保存為文本文件。每個batch的數(shù)據(jù)單獨保存為一個文夾,其中prefix為文件夾名前綴,文件夾名前綴參數(shù)必須傳入,[suffix]為文件夾名后綴,文件夾名后綴參數(shù)可選,最終文件夾名稱的完整形式為prefix-TIME_IN_MS[.suffix]。如果前綴中包含文件完整路徑,則該text文件夾會建在指定路徑下。saveAsTextFiles以文本的形式保存DStream中的內(nèi)容,可以保存在任何文件系統(tǒng)。saveAsObjectFiles是以序列化的格式保存。saveAsHadoopFiles是以文本的形式保存在HDFS上。使用DStream輸出操作將nc窗口中輸出的內(nèi)容保存在HDFS的/user/root/saveAsTextFiles文件夾下,設置每秒生成一個文件夾。使用DStream輸出操作保存結果如下圖使用DStream輸出操作foreachRDD是DStream提供的一個功能強大的方法,它可以將數(shù)據(jù)發(fā)送到外部系統(tǒng),在使用foreachRDD的過程中需避免以下錯誤。通常將數(shù)據(jù)寫入到外部系統(tǒng)需要創(chuàng)建一個連接對象(如TCP連接到遠程服務器),并用它來發(fā)送數(shù)據(jù)到遠程系統(tǒng)。在創(chuàng)建連接對象時應避免在Sparkdriver端創(chuàng)建連接對象,代碼如下所示,這種做法需要連接對象進行序列化并從Driver端發(fā)送到Worker上,但是連接對象很少在不同機器間進行這種操作。使用DStream輸出操作foreachRDD是DStream提供的一個功能強大的方法,它可以將數(shù)據(jù)發(fā)送到外部系統(tǒng),在使用foreachRDD的過程中需避免在SparkDriver端創(chuàng)建連接對象。針對以上所說的錯誤,正確的解決方法是在Worker上創(chuàng)建連接對象,如代碼所示,但是這種做法又會引發(fā)另外一種錯誤,即為每一個記錄創(chuàng)建一個連接對象。通常,創(chuàng)建一個連接對象會有時間和資源的開銷,因此,為每個記錄創(chuàng)建和銷毀連接對象會導致非常高的開銷,減少系統(tǒng)的整體吞吐量。使用DStream輸出操作foreachRDD的正確用法:使用rdd.foreachPartition方法創(chuàng)建一個單獨的連接對象,然后使用該連接對象輸出所有RDD分區(qū)中的數(shù)據(jù)到外部系統(tǒng)。這不僅可以緩解創(chuàng)建多條記錄連接的開銷,還可以通過在多個RDDs/batches上重用連接對象進行優(yōu)化。使用DStream輸出操作以網(wǎng)站熱詞排名為例,介紹如何正確使用foreachPartition將處理結果寫到MySQL數(shù)據(jù)庫中。首先在MySQL數(shù)據(jù)庫中創(chuàng)建數(shù)據(jù)庫和表用以接收處理后的數(shù)據(jù);新建的表searchKeyWord表有三個字段,分別為insert_date(插入數(shù)據(jù)的日期),keyword(熱詞),search_count(在設置的時間內(nèi)出現(xiàn)的次數(shù))。使用DStream輸出操作網(wǎng)站熱詞排名在IntelliJIDEA中編寫Spark代碼,設置窗口長度為60秒,窗口滑動時間間隔為10秒,計算10秒內(nèi)每個單詞出現(xiàn)的次數(shù),然后根據(jù)出現(xiàn)的次數(shù)對單詞進行排序;雖然DStream沒有提供sort的方法,但是可以使用transform函數(shù),用RDD的sortByKey實現(xiàn)。接著需要使用foreachPartition創(chuàng)建MySQL數(shù)據(jù)庫連接對象;然后使用該連接對象輸出數(shù)據(jù)到searchKeyWord表中。使用DStream輸出操作使用DStream輸出操作網(wǎng)站熱詞排名--代碼實現(xiàn)運行程序;在slave1啟動監(jiān)聽8888端口并輸入數(shù)據(jù);查看searchKeyWord表中的數(shù)據(jù);結果如下圖。使用DStream輸出操作1掌握DStream基礎操作目錄初識SparkStreaming2實現(xiàn)書籍熱度實時計算3掌握了SparkStreaming的DStream編程模型的基礎操作后,即可使用SparkStreaming框架解決實際的實時數(shù)據(jù)流處理問題。本節(jié)的任務如下。使用SparkStreaming實時計算書籍熱度;根據(jù)書籍熱度進行降序排序,獲取熱度最高的10本圖書;將最后的結果寫入Hive中。任務描述將用戶對書籍的評分數(shù)據(jù)文件BookRating.txt保存至本地目錄下,設置每隔60秒隨機從BookRating.txt文件中挑選100條記錄并添加至新日志文件中,新生成的日志文件存放在“F:\\StreamingData”路徑下。執(zhí)行代碼,產(chǎn)生文件,每個文件都有100條記錄,如右圖。獲取輸入數(shù)據(jù)源創(chuàng)建rating.scala,實例化StreamingContext對象并監(jiān)控“F:\\StreamingData”路徑,實時抽取產(chǎn)生的新文件的數(shù)據(jù)并轉化為數(shù)據(jù)流,設置批處理時間間隔為60秒。在獲取到數(shù)據(jù)流后,通過split()方法按制表符進行切分,并輸出數(shù)據(jù)流進行測試。運行結果如右圖。獲取輸入數(shù)據(jù)源根據(jù)書籍熱度的計算公式,需要計算出用戶的評分次數(shù)及用戶的平均評分。首先需要將獲取輸入源的代碼中切分后的數(shù)據(jù)流由DStream形式轉換為DataFrame形式,再使用SparkSQLAPI進行后續(xù)的數(shù)據(jù)處理。通過for

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論