chapter7-廈門林子雨大數(shù)據(jù)處理技術(shù)spark第7章streaming2017年春季學(xué)期_第1頁
chapter7-廈門林子雨大數(shù)據(jù)處理技術(shù)spark第7章streaming2017年春季學(xué)期_第2頁
chapter7-廈門林子雨大數(shù)據(jù)處理技術(shù)spark第7章streaming2017年春季學(xué)期_第3頁
chapter7-廈門林子雨大數(shù)據(jù)處理技術(shù)spark第7章streaming2017年春季學(xué)期_第4頁
chapter7-廈門林子雨大數(shù)據(jù)處理技術(shù)spark第7章streaming2017年春季學(xué)期_第5頁
已閱讀5頁,還剩102頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、廈門大學(xué)計(jì)算機(jī)科學(xué)系2017年版廈門大學(xué)課程http:/t/7659/溫馨提示:編輯幻燈片母版,可以修改每頁P(yáng)PT的廈大?;蘸偷撞课淖值?章 Spark Streaming(PPT版本號(hào):2017年春季學(xué)期)林子雨廈門大學(xué)計(jì)算機(jī)科學(xué)系掃一掃班級(jí)主頁: z主頁: HYPERLINK http:/w/ http:/w/linziyu提綱流計(jì)算概述Spark StreamingDStream操作概述輸入源轉(zhuǎn)換操作輸出操作免費(fèi):http:/blog/spark/z7.1流計(jì)算概述7.1.1 靜態(tài)數(shù)據(jù)和流數(shù)據(jù)7.1.2 批量計(jì)算和實(shí)時(shí)計(jì)算7.1.3 流計(jì)算概念7.1.4 流計(jì)算與Hadoop7.1.5

2、流計(jì)算框架7.1.6 流計(jì)算處理流程z7.1.1 靜態(tài)數(shù)據(jù)和流數(shù)據(jù)很多企業(yè)為了支持決策分析而構(gòu)建的數(shù)據(jù)倉庫系統(tǒng),其中存放的大量歷史數(shù)據(jù)就是靜態(tài)數(shù)據(jù)。技術(shù) 可以利用數(shù)據(jù)挖掘和OLAP(On-Line ytical Pro sing)分析工具從靜態(tài)數(shù)據(jù)中找到對企業(yè)有價(jià)值的信息z7.1.1 靜態(tài)數(shù)據(jù)和流數(shù)據(jù)近年來,在Web應(yīng)用、網(wǎng)絡(luò)、傳感監(jiān)測等領(lǐng)域,興起了一種新的數(shù)據(jù)密集型應(yīng)用流數(shù)據(jù),即數(shù)據(jù)以大量、快速、時(shí)變的流形式持續(xù)到達(dá)實(shí)例:PM2.5檢測、電子商務(wù)用戶點(diǎn)擊流流數(shù)據(jù)具有如下特征:數(shù)據(jù)快速持續(xù)到達(dá),潛在大小也許是無窮無盡的數(shù)據(jù)來源眾多,格式復(fù)雜數(shù)據(jù)量大,但是不十分關(guān)注,一旦經(jīng)過處理,要么被丟棄,要

3、么被歸檔注重?cái)?shù)據(jù)的整體價(jià)值,不過分關(guān)注個(gè)別數(shù)據(jù)數(shù)據(jù)順序顛倒,或者不完整,系統(tǒng)無法控制將要處理的新到達(dá)的數(shù)據(jù)元素的順序z7.1.2 批量計(jì)算和實(shí)時(shí)計(jì)算對靜態(tài)數(shù)據(jù)和流數(shù)據(jù)的處理,對應(yīng)著兩種截然不同的計(jì)算模式:批量計(jì)算和實(shí)時(shí)計(jì)算批量計(jì)算:充裕時(shí)間處理靜態(tài)數(shù)據(jù),如Hadoop流數(shù)據(jù)不適合采用批量計(jì)算,因?yàn)榱鲾?shù)據(jù)不適合用傳統(tǒng)的關(guān)系模型建模流數(shù)據(jù)必須采用實(shí)時(shí)計(jì)算,響應(yīng)時(shí)間為秒級(jí)數(shù)據(jù)量少時(shí),不是問題,但是,在大數(shù)據(jù)時(shí)代,數(shù)據(jù)格式復(fù)雜、來源眾多、數(shù)據(jù)量巨大,對實(shí)時(shí)計(jì)算提出了很大 的。因此,針對流數(shù)據(jù)的實(shí)時(shí)計(jì) 算流計(jì)算,應(yīng)運(yùn)而生圖 數(shù)據(jù)的兩種處理模型z7.1.3 流計(jì)算概念流計(jì)算:實(shí)時(shí)獲取來自不同數(shù)據(jù)源的海量數(shù)

4、據(jù),經(jīng)過實(shí)時(shí)分析處理,獲得有價(jià)值的信息實(shí)時(shí)分析處理結(jié)果反饋圖 流計(jì)算示意圖z7.1.3 流計(jì)算概念流計(jì)算秉承一個(gè)基本理念,即數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,如用戶點(diǎn)擊流。因此,當(dāng)事件出現(xiàn)時(shí)就應(yīng)該立即進(jìn)行處理,而不是緩存起來進(jìn)行批量處理。為了及時(shí)處理流數(shù)據(jù),就需要一個(gè)低延遲、可擴(kuò)展、高可靠的處理引擎對于一個(gè)流計(jì)算系統(tǒng)來說,它應(yīng)達(dá)到如下需求:高性能:處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬條數(shù)據(jù)海量式:支持TB級(jí)甚至是PB級(jí)的數(shù)據(jù)規(guī)模實(shí)時(shí)性:保證較低的延遲時(shí)間,達(dá)到秒級(jí)別,甚至是毫秒級(jí)別分布式:支持大數(shù)據(jù)的基本架構(gòu),必須能夠平滑擴(kuò)展易用性:能夠快速進(jìn)行開發(fā)和部署可靠性:能可靠地處理流數(shù)據(jù)z7.1.4

5、 流計(jì)算與HadoopHadoop設(shè)計(jì)的初衷是面向大規(guī)模數(shù)據(jù)的批量處理,每臺(tái)機(jī)器并行運(yùn)行MapReduce任務(wù),最后對結(jié)果進(jìn)行匯總輸出MapReduce是專門面向靜態(tài)數(shù)據(jù)的批量處理的,各種實(shí)現(xiàn)機(jī)制都為批處理做了高度優(yōu)化,不適合用于處理持續(xù)到達(dá)的動(dòng)態(tài)數(shù)據(jù)可能會(huì)想到一種“變通”的方案來降低批處理的時(shí)間延遲將基于MapReduce的批量處理轉(zhuǎn)為小批量處理,將輸入數(shù)據(jù)切成小的片段,每隔一個(gè)周期就啟動(dòng)一次MapReduce作業(yè)。但這種方式也無法有效處理流數(shù)據(jù)切分成小片段,可以降低延遲,但是也增加了附加開銷,還要處理片段之間依賴關(guān)系需要改造MapReduce以支持流式處理結(jié)論:魚和熊掌不可兼得,Hadoo

6、p擅長批處理,不適合流計(jì)算z7.1.5 流計(jì)算框架當(dāng)前業(yè)界誕生了許多專門的流數(shù)據(jù)實(shí)時(shí)計(jì)算系統(tǒng)來滿足各自需求目前有三類常見的流計(jì)算框架和:商業(yè)級(jí)的流計(jì)算、開源流計(jì)算框架、公司為支持自身業(yè)務(wù)開發(fā)的流計(jì)算框架商業(yè)級(jí):IBM InfoSphere Streams和IBM StreamBase較為常見的是開源流計(jì)算框架,代表如下:Storm:免費(fèi)、開源的分布式實(shí)時(shí)計(jì)算系統(tǒng),可簡單、高效、可靠地處理大量的流數(shù)據(jù)Yahoo! S4(Simple Scalable Streaming System):開源流計(jì)算,是通用的、分布式的、可擴(kuò)展的、分區(qū)容錯(cuò)的、可插拔的流式系統(tǒng)公司為支持自身業(yè)務(wù)開發(fā)的流計(jì)算框架:Pu

7、maDstream()流數(shù)據(jù)處理(淘寶)z7.1.6 流計(jì)算處理流程概述數(shù)據(jù)實(shí)時(shí)數(shù)據(jù)實(shí)時(shí)計(jì)算實(shí)時(shí)查詢服務(wù)z 概述傳統(tǒng)的數(shù)據(jù)處理流程,需要先數(shù)據(jù)并在關(guān)系數(shù)據(jù)庫等數(shù)據(jù)管理系統(tǒng)中,之后由用戶通過查詢操作和數(shù)據(jù)管理系統(tǒng)進(jìn)行交互用戶查詢傳統(tǒng)的數(shù)據(jù)處理流程示意圖傳統(tǒng)的數(shù)據(jù)處理流程隱含了兩個(gè)前提:的數(shù)據(jù)是舊的。的靜態(tài)數(shù)據(jù)是過去某一時(shí)刻的快照,這些數(shù)據(jù)在查詢時(shí)可能已不具備時(shí)效性了需要用戶主動(dòng)發(fā)出查詢來獲取結(jié)果z數(shù)據(jù)管理系統(tǒng)查詢結(jié)果 概述流計(jì)算的處理流程一般包含三個(gè)階段:數(shù)據(jù)實(shí)時(shí)、數(shù)據(jù)實(shí)時(shí)計(jì)算、實(shí)時(shí)查詢服務(wù)數(shù)據(jù)實(shí)時(shí)用戶查詢流計(jì)算處理流程示意圖z實(shí)時(shí)查詢服務(wù)查詢結(jié)果數(shù)據(jù)實(shí)時(shí)計(jì)算數(shù)據(jù)實(shí)時(shí)數(shù)據(jù)實(shí)時(shí)階段通常多個(gè)數(shù)據(jù)源的

8、海量數(shù)據(jù),需要保證實(shí)時(shí)性、低延遲與穩(wěn)定可靠以日志數(shù)據(jù)為例,由于分布式集群的廣泛應(yīng)用,數(shù)據(jù)分散的機(jī)器上,因此需要實(shí)時(shí)匯總來自不同機(jī)器上的日志數(shù)據(jù)在不同目前有許多互聯(lián)網(wǎng)公司發(fā)布的開源分布式日志系統(tǒng)均可滿足每秒數(shù)百M(fèi)B的和傳輸需求,如:的ScribeLinkedIn的Kafka淘寶的Time Tunnel基于Hadoop的Chukwa和Flume數(shù)據(jù)實(shí)時(shí)用戶查詢z實(shí)時(shí)查詢服務(wù)查詢結(jié)果數(shù)據(jù)實(shí)時(shí)計(jì)算 數(shù)據(jù)實(shí)時(shí)計(jì)算數(shù)據(jù)實(shí)時(shí)計(jì)算階段對并反饋實(shí)時(shí)結(jié)果的數(shù)據(jù)進(jìn)行實(shí)時(shí)的分析和計(jì)算,經(jīng)流處理系統(tǒng)處理后的數(shù)據(jù),可視情況進(jìn)行,以便之后再進(jìn)行分析計(jì)算。在時(shí)效性要求較高的場景中,處理之后的數(shù)據(jù)也可以直接丟棄流處理系統(tǒng)實(shí)時(shí)計(jì)

9、算數(shù)據(jù)流入數(shù)據(jù)流出計(jì)算結(jié)果數(shù)據(jù)實(shí)時(shí)數(shù)據(jù)實(shí)時(shí)計(jì)算流程數(shù)據(jù)實(shí)時(shí)計(jì)算用戶查詢z實(shí)時(shí)查詢服務(wù)查詢結(jié)果 實(shí)時(shí)查詢服務(wù)實(shí)時(shí)查詢服務(wù):經(jīng)由流計(jì)算框架得出的結(jié)果可供用戶進(jìn)行實(shí)時(shí)查詢、展示或傳統(tǒng)的數(shù)據(jù)處理流程,用戶需要主動(dòng)發(fā)出查詢才能獲得想要的結(jié)果。而在流處理流程中,實(shí)時(shí)查詢服務(wù)可以不斷更新結(jié)果,并將用戶所需的結(jié)果實(shí)時(shí)推送給用戶雖然通過對傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)進(jìn)行定時(shí)查詢,也可以實(shí)現(xiàn)不斷地更新結(jié)果和結(jié)果推送,但通過這樣的方式獲取的結(jié)果,仍然是根據(jù)過去某一時(shí)刻的數(shù)據(jù)得到的結(jié)果,與實(shí)時(shí)結(jié)果有著本質(zhì)的區(qū)別數(shù)據(jù)實(shí)時(shí)數(shù)據(jù)實(shí)時(shí)計(jì)算用戶查詢查詢結(jié)果實(shí)時(shí)查詢服務(wù)z 實(shí)時(shí)查詢服務(wù)可見,流處理系統(tǒng)與傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)有如下不同:流處理

10、系統(tǒng)處理的是實(shí)時(shí)的數(shù)據(jù),而傳統(tǒng)的數(shù)據(jù)處理系統(tǒng)處理的是預(yù)先好的靜態(tài)數(shù)據(jù)用戶通過流處理系統(tǒng)獲取的是實(shí)時(shí)結(jié)果,而通過傳統(tǒng)的數(shù)據(jù)處理系統(tǒng),獲取的是過去某一時(shí)刻的結(jié)果流處理系統(tǒng)無需用戶主動(dòng)發(fā)出查詢,實(shí)時(shí)查詢服務(wù)可以主動(dòng)將實(shí)時(shí)結(jié)果推送給用戶z7.2 Spark StreamingSpark Streaming設(shè)計(jì)Spark Streaming與Storm的對比z7.2.1 Spark Streaming設(shè)計(jì)Spark Streaming可整合多種輸入數(shù)據(jù)源,如Kafka、 Flume、HDFS,甚至是普通的TCP套接處理后的數(shù)據(jù)可至文件系統(tǒng)、數(shù)據(jù)庫,或顯示在里KafkaHDFSFlumeSparkStre

11、amingDatabasesHDFSDashboardsTCP socket圖 Spark Streaming支持的輸入、輸出數(shù)據(jù)源z7.2.1 Spark Streaming設(shè)計(jì)Spark Streaming的基本原理是將實(shí)時(shí)輸入數(shù)據(jù)流以時(shí)間片(秒級(jí))為 進(jìn)行拆分,然后經(jīng)Spark引擎以類似批處理的方式處理每個(gè)時(shí)間片數(shù)據(jù)input databatches ofbatches ofstreaminput datapro sed dataSparkSparkStreamingEngine圖 Spark Streaming執(zhí)行流程z7.2.1 Spark Streaming設(shè)計(jì)Spark Stre

12、aming最主要的抽象是DStream(Discretized Stream,離散化數(shù)據(jù)流),表示連續(xù)不斷的數(shù)據(jù)流。在實(shí)現(xiàn)上,Spark Streaming的輸入數(shù)據(jù)按照時(shí)間片(如1秒)分成一段一段,每一段數(shù)據(jù)轉(zhuǎn)換為Spark中的RDD,這些分段就是Dstream,并且對DStream的操作都最終轉(zhuǎn)變?yōu)閷ο鄳?yīng)的RDD的操作RDD time 1RDD time 2RDD time 3RDD time 4Lineslines fromlines fromlines fromlines fromDStreamtime 0 to 1time 1 to 2time 2 to 3time 3 to 4fl

13、atMapoperationwordswords fromDStreamtime 3 to 4RDD result 1RDD result 2RDD result 3RDD result 4圖 DStream操作示意圖zwords fromwords from time 2 to 3time 1 to 2words from time 0 to 17.2.2 Spark Streaming與Storm的對比Spark Streaming和Storm最大的區(qū)別在于,Spark Streaming無法實(shí)現(xiàn)毫秒級(jí)的流計(jì)算,而Storm可以實(shí)現(xiàn)毫秒級(jí)響應(yīng)Spark Streaming構(gòu)建在Spark上

14、,一方面是因?yàn)?Spark的低延遲執(zhí)行引擎(100ms+)可以用于實(shí)時(shí)計(jì)算,另一方面,相比于Storm,RDD數(shù)據(jù)集更容易做高效的容錯(cuò)處理Spark Streaming采用的小批量處理的方式使得它可 以同時(shí)兼容批量和實(shí)時(shí)數(shù)據(jù)處理的邏輯和算法,因此,方便了一些需要?dú)v史數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù)聯(lián)合分析的特定 應(yīng)用場合z7.3 DStream操作概述Spark Streaming工作原理Spark Streaming程序基本步驟創(chuàng)建StreamingContext對象z7.3 DStream操作概述圖 Spark運(yùn)行架構(gòu)在Spark中,一個(gè)應(yīng)用(Application)由一個(gè)任務(wù)控制節(jié)點(diǎn)(Driver)和若干

15、個(gè)作業(yè)(Job) ,一個(gè)作業(yè)由多個(gè)階段(Stage) ,一個(gè)階段由多個(gè)任務(wù)(Task)組成當(dāng)執(zhí)行一個(gè)應(yīng)用時(shí),任務(wù)控制節(jié)點(diǎn)會(huì)向集群管理器(Cluster Manager)申請資源,啟動(dòng)Executor,并向Executor發(fā)送應(yīng)用程序代碼和文件,然后在Executor上執(zhí)行taskzWorker NodeExecutorCacheTaskTaskCluster ManagerDrivrogramSparkContextWorker NodeExecutorCacheTaskTask7.3.1 Spark Streaming工作原理在Spark Streaming中,會(huì)有一個(gè)組件Receiver,

16、作為一個(gè)長期運(yùn)行的task跑在一個(gè)Executor上每個(gè)Receiver都會(huì)負(fù)責(zé)一個(gè)input DStream(比如從文件中 數(shù)據(jù)的文件流,比如套接字流,或者從Kafka中讀取的一個(gè)輸入流等等)Spark Streaming通過input DStream與外部數(shù)據(jù)源進(jìn)行連接,相關(guān)數(shù)據(jù)z7.3.2 Spark Streaming程序基本步驟編寫Spark Streaming程序的基本步驟是: 1.通過創(chuàng)建輸入DStream來定義輸入源2.通過對DStream應(yīng)用轉(zhuǎn)換操作和輸出操作來定義流計(jì)算 3.用streamingContext.start()來開始接收數(shù)據(jù)和處理流程通過streamingCo

17、ntext.awaitTermination()方法來等待處理結(jié)束(手動(dòng)結(jié)束或因?yàn)殄e(cuò)誤而結(jié)束)可以通過streamingContext.stop()來手動(dòng)結(jié)束流計(jì)算進(jìn)程z7.3.3 創(chuàng)建StreamingContext對象如果要運(yùn)行一個(gè)Spark Streaming程序,就需要首先生成一個(gè) StreamingContext對象,它是Spark Streaming程序的主可以從一個(gè)SparkConf對象創(chuàng)建一個(gè)StreamingContext對象登錄Linux系統(tǒng)后,啟動(dòng)spark-s。進(jìn)入spark-s以后,就已經(jīng)獲得了一個(gè)默認(rèn)的SparkConext,也就是sc。因此,可以采用如下方式來創(chuàng)建

18、StreamingContext對象:scala import.apache.spark.streaming._scala val ssc = new StreamingContext(sc, Seconds(1)z7.3.3 創(chuàng)建StreamingContext對象如果是編寫一個(gè)獨(dú)立的Spark Streaming程序,而不是在 spark-s中運(yùn)行,則需要通過如下方式創(chuàng)建 StreamingContext對象:import.apache.spark._import.apache.spark.streaming._ val conf = newSparkConf().setAppName(T

19、estDStream).setMaster(local2)val ssc = new StreamingContext(conf, Seconds(1)z7.4 輸入源基本輸入源文件流套接字流RDD隊(duì)列流高級(jí)數(shù)據(jù)源Apache KafkaApache Flumez7.4.1 基本輸入源文件流套接字流RDD隊(duì)列流z 文件流在logfile中新建兩個(gè)日志文件log1.txt和log2.txt,隨便輸入內(nèi)容。比如,在log1.txt中輸入以下內(nèi)容:I love Hadoop I love Spark Spark is fastz$ cd /usr/local/spark/mycode$ mkdir

20、streaming$ cd streaming$ mkdir logfile$ cd logfile 文件流(1)進(jìn)入spark-s創(chuàng)建文件流。請另外打開一個(gè)終端窗口,啟動(dòng)進(jìn)入spark-sscala import.apache.spark.streaming._scala val ssc = new StreamingContext(sc, Seconds(20) scala val lines =ssc.textFileStream(file:/usr/local/spark/mycode/streaming/logfile) scala val words = lines.flatMap

21、(_.split( )scala val wordCounts = words.map(x = (x, 1).reduceByKey(_ +_) scala wordCounts.pr() scala ssc.start()scala ssc.awaitTermination()z 文件流上面在spark-s中執(zhí)行的程序,一旦你輸入ssc.start()以后,程序就開始自動(dòng)進(jìn)入循環(huán)狀態(tài),屏幕上會(huì)顯示一堆的信息,如下:在/urs/local/spark/mycode/streaming/logfile目錄下再新建一個(gè)log3.txt文件,就可以在窗口中顯示詞頻統(tǒng)計(jì)結(jié)果z/這里省略若干屏幕信息Ti

22、me: 1479431100000 ms/這里省略若干屏幕信息Time: 1479431120000 ms/這里省略若干屏幕信息Time: 1479431140000 ms 文件流(2)采用獨(dú)立應(yīng)用程序的方式實(shí)現(xiàn)上述文件夾的功能z$ cd /usr/local/spark/mycode$ mkdir streaming$ cd streaming$ mkdir -p src/main/scala$ cd src/main/scala$ vim TestStreaming.scala 文件流用vim編輯器新建一個(gè)TestStreaming.scala代碼文件,請?jiān)诶锩孑斎胍韵麓a:import.

23、apache.spark._import.apache.spark.streaming._object WordCountStreaming def main(args: ArrayString) val sparkConf = newSparkConf().setAppName(WordCountStreaming).setMaster(local2)/設(shè)置為本地運(yùn)行模式,2個(gè)線程,一個(gè),另一個(gè)處理數(shù)據(jù)val ssc = new StreamingContext(sparkConf, Seconds(2)/ 時(shí)間間隔為2秒val lines = ssc.textFileStream(file

24、:/usr/local/spark/mycode/streaming/logfile) /這里采用本地文件,當(dāng)然你也可以采用HDFS文件val words = lines.flatMap(_.split( )val wordCounts = words.map(x = (x, 1).reduceByKey(_ + _) wordCounts.pr()ssc.start()ssc.awaitTermination()z 文件流在simple.sbt文件中輸入以下代碼:name := Simple Project ver:= 1.0scalaVer:= 2.11.8libraryDependenc

25、ies += .apache.spark % spark-streaming_2.11 % 2.1.0執(zhí)行sbt打包編譯令如下:z$ cd /usr/local/spark/mycode/streaming$ /usr/local/sbt/sbt package$ cd /usr/local/spark/mycode/streaming$ vim simple.sbt 文件流打包成功以后,就可以輸入以下命令啟動(dòng)這個(gè)程序: WordCountStreaming/usr/local/spark/mycode/streaming/scala- 2.11/simple-project_2.11-1.0

26、.jar執(zhí)行上面命令后,就進(jìn)入了狀態(tài)(把運(yùn)行這個(gè)程序的窗口稱為窗口)切換到另外一個(gè)S窗口,在 /usr/local/spark/mycode/streaming/logfile目錄下再新建一個(gè)log5.txt文件,文件里面隨便輸入一些單詞,保存好文件退出vim編輯器再次切換回“窗口”,等待20秒以后,按鍵盤Ctrl+C或者Ctrl+D停止程序,就可以看到窗口的屏幕上會(huì)打印出單詞統(tǒng)計(jì)信息z$ cd /usr/local/spark/mycode/streaming$ /usr/local/spark/bin/spark-submit -class 套接字流Spark Streaming可以通過S

27、ocket端口并接收數(shù)據(jù),然后進(jìn)行相應(yīng)處理請?jiān)贜etworkWordCount.scala文件中輸入如下內(nèi)容:package.apache.spark.exles.streamingimport.apache.spark._import.apache.spark.streaming._import.apache.spark.storage.StorageLevel剩余代碼在下一頁z$ cd /usr/local/spark/mycode$ mkdir streaming #如果已經(jīng)存在該目錄,則不用創(chuàng)建$ mkdir -p /src/main/scala #如果已經(jīng)存在該目錄,則不用創(chuàng)建$ c

28、d /usr/local/spark/mycode/streaming/src/main/scala$ vim NetworkWordCount.scalaobject NetworkWordCount def main(args: ArrayString) if (args.length 2) System.err.prln(Usage: NetworkWordCount ) System.exit(1)StreamingExles.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName(NetworkWordCou

29、nt).setMaster(local2)val ssc = new StreamingContext(sparkConf, Seconds(1) val lines = ssc.socketTextStream(args(0), args(1).to,StorageLevel.MEMORY_AND_DISK_SER)val words = lines.flatMap(_.split( )val wordCounts = words.map(x = (x, 1).reduceByKey(_ + _) wordCounts.pr()ssc.start()ssc.awaitTermination(

30、)z 套接字流在相同目錄下再新建另外一個(gè)代碼文件StreamingExles.scala,文件內(nèi)容如下: package.apache.spark.exles.streamingimport.apache.spark.ernal.Loggingimport.apache.log4j.Level, Logger/* Utility functions for Spark Streaming exles. */ object StreamingExles extends Logging /* Set reasonable logging levels for streaming if the us

31、er has not configured log4j. */ def setStreamingLogLevels() val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) / Welog something to initialize Sparks default logging, then we override the/ logging level.logInfo(Setting log level to WARN for streaming ex

32、le. + To override add a custom perties to the classpath.) Logger.getRootLogger.setLevel(Level.WARN)z 套接字流name := Simple Project ver:= 1.0scalaVer:= 2.11.8libraryDependencies += .apache.spark % spark-streaming_2.11 % 2.1.0$ cd /usr/local/spark/mycode/streaming$ /usr/local/spark/bin/spark-submit -clas

33、s .apache.spark.exles.streaming.NetworkWordCount/usr/local/spark/mycode/streaming/scala-2.11/simple-project_2.11- 1.0.jar localhost 9999z$ cd /usr/local/spark/mycode/streaming$ /usr/local/sbt/sbt package$ cd /usr/local/spark/mycode/streaming/$ vim simple.sbt 套接字流新打開一個(gè)窗口作為nc窗口,啟動(dòng)nc程序:可以在nc窗口中隨意輸入一些單詞

34、,窗口就會(huì)自動(dòng)獲得單詞數(shù)據(jù)流信息,在窗口每隔1秒就會(huì)打印出詞頻統(tǒng)計(jì)信息,大概會(huì)在屏幕上出現(xiàn)類似如下的結(jié)果:zTime: 1479431100000 ms (o,1)(world,1)Time: 1479431120000 ms(hadoop,1)Time: 1479431140000 ms(spark,1)$ nc -lk 9999 套接字流下面 再前進(jìn)一步,把數(shù)據(jù) 的產(chǎn)生方式修改一下,不要使用nc程序,而是采用自己編寫的程序產(chǎn)生Socket數(shù)據(jù)源 $ vim DataSourocket.scala package.apache.spark.exles.streamingImport java

35、.io.PrWriter Import .ServerSocket Import scala.io.Source剩余代碼見下一頁z$ cd /usr/local/spark/mycode/streaming/src/main/scala 套接字流object DataSourocket def index(length:) = val rdm = new java.util.Random rdm.next(length)def main(args: ArrayString) if (args.length != 3) System.err.prln(Usage: )System.exit(1)

36、val fileName = args(0)val lines = Source.fromFile(fileName).getLines.toListval rowCount = lines.length剩余代碼見下一頁z 套接字流val listener = new ServerSocket(args(1).to)while (true) val socket = listener.accept() new Thread() override def run = prln(Got cnt connected from: + socket.getInetAddress) val out = n

37、ew PrWriter(socket.getOutputStream(), true) while (true) Thread.sleep(args(2).toLong)val content = lines(index(rowCount)prln(content) out.write(content + n) out.flush()socket.close().start()z 套接字流執(zhí)行sbt打包編譯:DataSourocket程序需要把一個(gè)文本文件作為輸入?yún)?shù),所以,在啟動(dòng)這個(gè)程序之前,需要首先創(chuàng)建一個(gè)文本文件word.txt并隨便輸入幾行內(nèi)容:/usr/local/spark/myc

38、ode/streaming/word.txt啟動(dòng)DataSourocket程序:這個(gè)窗口會(huì)不斷打印出一些隨機(jī)到的文本信息,這些信息也是Socket數(shù)據(jù)源,會(huì)被程序捕捉到z$ /usr/local/spark/bin/spark-submit -class.apache.spark.exles.streaming.DataSourocket/usr/local/spark/mycode/streaming/scala-2.11/simple-project_2.11-1.0.jar /usr/local/spark/mycode/streaming/word.txt 9999 1000$ cd

39、/usr/local/spark/mycode/streaming$ /usr/local/sbt/sbt package 套接字流在另外一個(gè)窗口啟動(dòng)程序:$ /usr/local/spark/bin/spark-submit -class.apache.spark.exles.streaming.NetworkWordCount/usr/local/spark/mycode/streaming/scala-2.11/simple-project_2.11- 1.0.jar localhost 9999啟動(dòng)成功后,你就會(huì)看到,屏幕上不斷打印出詞頻統(tǒng)計(jì)信息z RDD隊(duì)列流在調(diào)試Spark Str

40、eaming應(yīng)用程序的時(shí)候, 可以使用 streamingContext.queueStream(queueOfRDD)創(chuàng)建基于 RDD隊(duì)列的DStream新建一個(gè)TestRDDQueueStream.scala代碼文件,功能是:每隔1秒創(chuàng)建一個(gè)RDD,Streaming每隔2秒就對數(shù)據(jù)進(jìn)行處理package.apache.spark.exles.streamingimport.apache.spark.SparkConf import.apache.spark.rdd.RDDimport.apache.spark.streaming.StreamingContext._ import.apa

41、che.spark.streaming.Seconds, StreamingContext剩余代碼在下一頁z RDD隊(duì)列流object QueueStream def main(args: ArrayString) val sparkConf = newSparkConf().setAppName(TestRDDQueue).setMaster(local2)val ssc = new StreamingContext(sparkConf, Seconds(2) val rddQueue =newscala.collection.mutable.SynchronizedQueueRDD() v

42、al queueStream = ssc.queueStream(rddQueue)val mappedStream = queueStream.map(r = (r % 10, 1) val reducedStream = mappedStream.reduceByKey(_ + _) reducedStrer()ssc.start()剩余代碼見下一頁z RDD隊(duì)列流for (i 100s ms 1 hour圖 Kafka作為樞紐z.1 Kafka介紹BrokerKafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為brokerTopic每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱

43、為Topic。(物理上不同Topic的消息分開,邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上,但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)PartitionPartition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.Producer負(fù)責(zé)發(fā)布消息到Kafka brokerConsumer消息消費(fèi)者,向Kafka broker消息的客戶端。Consumer Group每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)z

44、.1 Kafka介紹z.2 Kafka的安裝和準(zhǔn)備工作關(guān)于Kafka的安裝方法,請參考廈門大學(xué)數(shù)據(jù)庫建設(shè)的中國高校大數(shù)據(jù)課程公共的技術(shù)博客文章Kafka的安裝和簡單實(shí)例測試http:/blog/1096-2/這里假設(shè)已經(jīng)成功安裝Kafka到“/usr/local/kafka”目錄下中國高校大數(shù)據(jù)教學(xué)知名品牌每年量超過100萬次z.2 Kafka的安裝和準(zhǔn)備工作說明:本課程的安裝文件為Kafka_2.11-.tgz,前面的2.11就是該Kafka所支持的Scala版本號(hào),后面的 是Kafka自身的版本號(hào)打開一個(gè)終端,輸入下面命令啟動(dòng)Zookeeper服務(wù):$ cd /usr/local/kafk

45、a$ ./bin/zookeeper-server-start.sh config/zookeeproperties千萬不要關(guān)閉這個(gè)終端窗口,一旦關(guān)閉,Zookeeper服務(wù)就停止了z.2 Kafka的安裝和準(zhǔn)備工作打開第二個(gè)終端,然后輸入下面命令啟動(dòng)Kafka服務(wù): $ bin/kafka-server-start.sh config/servroperties千萬不要關(guān)閉這個(gè)終端窗口,一旦關(guān)閉,Kafka服務(wù)就停止了z$ cd /usr/local/kafka.3 Spark準(zhǔn)備工作Kafka和Flume等高級(jí)輸入源,需要依賴獨(dú)立的庫(jar文件)在spark-s中執(zhí)行下面import語句

46、進(jìn)試:scala import.apache.spark.streaming.kafka._:25: error: object kafka is not a member ofpackage.apache.spark.streamingimport.apache.spark.streaming.kafka._對于Spark2.1.0版本,如果要使用Kafka,則需要spark-streaming-kafka-0-8_2.11相關(guān)jar包,地址:http:/artifact/.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0z.3 Spark準(zhǔn)

47、備工作就把jar文件到Spark目錄的jars目錄下$ cd /usr/local/spark/jars$ mkdir kafka$ cd $ cd$ cp ./spark-streaming-kafka-0-8_2.11-2.1.0.jar/usr/local/spark/jars/kafka繼續(xù)把Kafka安裝目錄的libs目錄下的所有jar文件到 “/usr/local/spark/jars/kafka目錄下,請?jiān)诮K端中執(zhí)行下面命令:z$ cd /usr/local/kafka/libs$ cp ./* /usr/local/spark/jars/kafka.4 編寫Spark程序使用K

48、afka數(shù)據(jù)源編寫KafkaWordProducer程序。執(zhí)行命令創(chuàng)建代碼目錄:在KafkaWordProducer.scala中輸入以下代碼:package.apache.spark.exles.streaming import java.util.HashMapimport.apache.kafka.cducer.KafkaProducer, ProducerConfig,ProducerRecordimport.apache.spark.SparkConf import.apache.spark.streaming._import.apache.spark.streaming.kafka

49、._剩余代碼見下一頁z$ cd /usr/local/spark/mycode$ mkdir kafka$ cd kafka$ mkdir -p src/main/scala$ cd src/main/scala$ vim KafkaWordProducer.scala.4 編寫Spark程序使用Kafka數(shù)據(jù)源object KafkaWordProducer def main(args: ArrayString) if (args.length 4) System.err.prln(Usage: KafkaWordCountProducer + ) System.exit(1)val Arr

50、ay(brokers, topic, messagesPerSec, wordsPerMessage) = args/ Zookeeper connection propertiesval props = new HashMapString, Object() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ormon.serialization.StringSerializer) props.put(Produc

51、erConfig.KEY_SERIALIZER_CLASS_CONFIG, ormon.serialization.StringSerializer)val producer = new KafkaProducerString, String(props)剩余代碼見下一頁z.4 編寫Spark程序使用Kafka數(shù)據(jù)源/ Send some messages while(true) (1 to messagesPerSec.to).foreach messageNum =val str = (1 to wordsPerMessage.to).map(x =scala.util.Random.ne

52、xt(10).toString).mkString( )pr(str)prln()val message = new ProducerRecordString, String(topic, null, str)producer.send(message)Thread.sleep(1000)z.4 編寫Spark程序使用Kafka數(shù)據(jù)源繼續(xù)在當(dāng)前目錄下創(chuàng)建KafkaWordCount.scala代碼文件,它會(huì)把KafkaWordProducer發(fā)送過來的單詞進(jìn)行詞頻統(tǒng) 計(jì),代碼內(nèi)容如下:package.apache.spark.exles.streaming import.apache.spar

53、k._import.apache.spark.SparkConf import.apache.spark.streaming._import.apache.spark.streaming.kafka._import.apache.spark.streaming.StreamingContext._import.apache.spark.streaming.kafka.KafkaUtils剩余代碼見下一頁z.4 編寫Spark程序使用Kafka數(shù)據(jù)源object KafkaWordCountdef main(args:ArrayString)StreamingExles.setStreaming

54、LogLevels()val sc = new SparkConf().setAppName(KafkaWordCount).setMaster(local2) val ssc = new StreamingContext(sc,Seconds(10)ssc.checkpo(file:/usr/local/spark/mycode/kafka/checkpo) /設(shè)置檢查點(diǎn),如果存放在HDFS上面,則寫成類似ssc.checkpo(/user/hadoop/checkpo)這種形式,但是,要啟動(dòng)hadoopval zkQuorum = localhost:2181 /Zookeeper服務(wù)器地

55、址val group = 1 /topic所在的group,可以設(shè)置為自己想要的名稱,比如不用1,而是val group = test-consumer-group剩余代碼見下一頁z.4 編寫Spark程序使用Kafka數(shù)據(jù)源val topics = wordsender /topics的名稱 val numThreads = 1 /每個(gè)topic的分區(qū)數(shù)val topicMap =topics.split(,).map(_,numThreads.to).toMapval lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap

56、)val lines = lineMap.map(_._2)val words = lines.flatMap(_.split( ) val pair = words.map(x = (x,1)val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ -_,Minutes(2),Seconds(10),2) /這行代碼的含義在下一節(jié)的窗口轉(zhuǎn)換操作中會(huì)有介紹wordCounts.pr ssc.start ssc.awaitTerminationz.4 編寫Spark程序使用Kafka數(shù)據(jù)源繼續(xù)在當(dāng)前目錄下創(chuàng)建StreamingExles.scala代碼

57、文件,用于設(shè)置log4j:package.apache.spark.exles.streamingimport.apache.spark.Loggingimport.apache.log4j.Level, Logger/* Utility functions for Spark Streaming exles. */ object StreamingExles extends Logging /* Set reasonable logging levels for streaming if the user has not configured log4j.*/def setStreaming

58、LogLevels() val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) / Welog something to initialize Sparks default logging, then we override the/ logging level.logInfo(Setting log level to WARN for streaming exle. + To override add a custom perties to the c

59、lasspath.) Logger.getRootLogger.setLevel(Level.WARN)z.4 編寫Spark程序使用Kafka數(shù)據(jù)源創(chuàng)建simple.sbt文件:在simple.sbt中輸入以下代碼:name := Simple Project ver:= 1.0scalaVer:= 2.11.8libraryDependencies += .apache.spark % spark-core % 2.1.0 libraryDependencies += .apache.spark % spark-streaming_2.11 % 2.1.0 libraryDependenc

60、ies += .apache.spark % spark-streaming-kafka-0-8_2.11 % 2.1.0進(jìn)行打包編譯:z$ cd /usr/local/spark/mycode/kafka/$ /usr/local/sbt/sbt package$ cd /usr/local/spark/mycode/kafka/$ vim simple.sbt.4 編寫Spark程序使用Kafka數(shù)據(jù)源打開一個(gè)終端,執(zhí)行如下命令,運(yùn)行 “KafkaWordProducer程序,生成一些單詞(是一堆整數(shù)形式的單詞):$ cd /usr/local/spark$ /usr/local/spar

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論