大數(shù)據(jù)就業(yè)特訓(xùn)營-直播spark編程模型_第1頁
大數(shù)據(jù)就業(yè)特訓(xùn)營-直播spark編程模型_第2頁
大數(shù)據(jù)就業(yè)特訓(xùn)營-直播spark編程模型_第3頁
大數(shù)據(jù)就業(yè)特訓(xùn)營-直播spark編程模型_第4頁
大數(shù)據(jù)就業(yè)特訓(xùn)營-直播spark編程模型_第5頁
免費預(yù)覽已結(jié)束,剩余20頁可下載查看

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

概要依賴管理基本套路輸入源轉(zhuǎn)換操作輸出操作持久化操作依賴管理依賴<dependency><groupId>

.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version></dependency>Source相關(guān)依賴部分source相關(guān)依賴現(xiàn)在已經(jīng)單獨打包,需要單獨引入基本套路//1、參數(shù)處理if

(args.length

<

2)

{System.err.println("Usage:

NetworkWordCount

<hostname>

<port>")System.exit(1)}//2、初始化StreamingContextval

sparkConf

=

new

SparkConf().setAppName("NetworkWordCount")val

ssc

=

new

StreamingContext(sparkConf,

Seconds(1))//3、從source獲取數(shù)據(jù)創(chuàng)建DStreamval

lines

=

ssc.socketTextStream(args(0),args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)//4、對DStream進行val

words

=

lines.flatMap(_.split("

"))val

wordCounts=

words.map(x

=>

(x,

1)).reduceByKey(_+

_)//5、處理計算結(jié)果

wordCounts.print()//6、啟動Spark

Streamingssc.start()ssc.awaitTermination()輸入源Dstream輸入源---input

DStreamSpark內(nèi)置了兩類Source:Source分類舉例說明Basic

sourcesfile

systems,

socketconnections,

and

AkkaactorsStreamingContext

直接就可以創(chuàng)建,無需引入額外的依賴Advanced

sourcesKafka,

Flume,Kinesis,,

etc需要引入相關(guān)依賴,并且需要通過相關(guān)的工具類來創(chuàng)建val

lines

=

ssc.socketTextStream(args(0),args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)import

.apache.spark.streaming.kafka._val

kafkaStream

=

KafkaUtils.createStream(streamingContext,

[ZK

quorum],

[consumer

group

id],

[per-topic

number

of

Kafka

partitions

to

consume])Dstream輸入源---Receiverinput

Dstream都會關(guān)聯(lián)一個Receiver(除了FileInputDStream)Receiver以任務(wù)的形式運行在應(yīng)用的執(zhí)行器進程中,從輸入源收集數(shù)據(jù)并保存為RDD。Receiver收集到輸入數(shù)據(jù)后會把數(shù)據(jù)

到另一個執(zhí)行器進程來保障容錯性(默認行為)Receiver會消耗額外的cpu資源,所以要注意分配

的cpu

cores(receiver是一個單獨的task,會消耗cpu)local模式下不要“l(fā)ocal”or

“l(fā)ocal[1]”(需要指定多個,只有一個核會卡?。┓植际竭\行時,分配的cores >

receivers的數(shù)量StreamingContext

會周期性地運行Spark

作業(yè)來處理這些數(shù)據(jù)(每接受一批次數(shù)據(jù),就會提交作業(yè)運行處理),把數(shù)據(jù)與之前時間區(qū)間中的RDD進行整合(如果是時間窗口,需要與其它RDD做運算整合)內(nèi)置的input

Dstream:Basic

Sources內(nèi)置input

Dstream–

/apache/spark/tree/v1.6.2/external(高級)文件流val

logData

=

ssc.textFileStream(logDirectory)Spark

支持從任意Hadoop

兼容的文件系統(tǒng)中

數(shù)據(jù),

Spark

Streaming

也就支持從任意Hadoop

兼容的文件系統(tǒng)

中的文件創(chuàng)建數(shù)據(jù)流(InputFormat參數(shù)化)ssc.fileStream[LongWritable,

IntWritable,SequenceFileInputFormat[LongWritable,

IntWritable]](inputDirectory).map

{case

(x,

y)

=>

(x.get(),

y.get())}文件必須原子化創(chuàng)建(比如把文件移入Spark

,而不是一條條往已有文件寫數(shù)據(jù))Akka

actor流(spark

底層使用akka通信)內(nèi)置的input

Dstream:Advanced

SourcesApache

Kafkadef

main(args:

Array[String])

{if

(args.length

<

4)

{System.err.println("Usage:KafkaWordCount

<zkQuorum><group>

<topics>

<numThreads>")System.exit(1)}#使用Array接受args參數(shù)val

Array(zkQuorum,

group,

topics,

numThreads)

=

argsval

sparkConf

=

new

SparkConf().setAppName("KafkaWordCount")val

ssc

=

new

StreamingContext(sparkConf,

Seconds(2))#指定hdfs

,作用:容錯ssc.checkpoint("checkpoint")val

topicMap

=

topics.split(",").map((_,

numThreads.toInt)).toMapval

lines

=

KafkaUtils.createStream(ssc,

zkQuorum,

group,

topicMap).map(_._2)val

words

=

lines.flatMap(_.split("

"))val

wordCounts

=

words.map(x

=>

(x,

1L)).reduceByKeyAndWindow(_

+

_,_

-

_,Minutes(10),

Seconds(2),

2)wordCounts.print()ssc.start()ssc.awaitTermination()}/apache/spark/examples/st

/apache/spark/tree/v1.6.2/examples/src/main/scala/

reamingDstream輸入源:multiple

input

DStreammultiple

input

streams(same

type

and

same

slideduration)//相同的類型,相同的滑動窗口ssc.union(Seq(stream1,stream2,…))

//合并多個streamstream.union(otherStream)//兩個stream進行合并Dstream輸入源:Custom

ReceiverCustom

input

Dstream–

.apache.spark.streaming.receiver.Receiver(只需要擴展Receiver)–無狀態(tài)轉(zhuǎn)換操作和Sparkcore的語義?一致無狀態(tài)轉(zhuǎn)化操作就是把簡單的RDD

轉(zhuǎn)化操作應(yīng)用到每個批次上,也就是轉(zhuǎn)化DStream中的每一個RDD(對Dstream的操作會

到每個批次的RDD上)無狀態(tài)轉(zhuǎn)換操作不會跨多個batch的RDD去執(zhí)行(每個批次的RDD結(jié)果不能累加)有狀態(tài)轉(zhuǎn)換操作1-updateStateByKey有時 需要在DStream

中跨所有批次狀態(tài)(例如用戶的會話)。針對這種情況,updateStateByKey()

為 提供了對一個狀態(tài)變量的 ,用于鍵值對形式的Dstream使用updateStateByKey需要完成兩步工作:定義狀態(tài):可以是任意數(shù)據(jù)類型定義狀態(tài)更新函數(shù)-updateFuncupdate(events,

oldState)events:是在當(dāng)前批次中收到的事件的列表(可能為空)。–oldState:是一個可選的狀態(tài)對象,存放在Option

內(nèi);如果一個鍵沒有之前的狀態(tài),這個值可以空缺。newState:由函數(shù)返回,也以O(shè)ption

形式存在;

可以返回一個空的Option

來表示想要刪除該狀態(tài)。注意:有狀態(tài)轉(zhuǎn)化操作需要在你的StreamingContext

中打開檢查點機制來確保容錯性–

ssc.checkpoint("hdfs://...")有狀態(tài)轉(zhuǎn)換操作2-window基于窗口的操作會在一個比StreamingContext

的批次間隔更長的時間范圍內(nèi),通過整合多個批次的結(jié)果,計算出整個窗口的結(jié)果所有基于窗口的操作都需要兩個參數(shù),分別為windowDuration以及slideDuration,兩者都必須是StreamContext

的批次間隔的整數(shù)倍valaccessLogsWindow

=

accessLogsDStream.window(Seconds(30),

Seconds(10))val

windowCounts

=

accessLogsWindow.count()batchDuration(每個批次的長度)val

ssc

=

new

StreamingContext(sparkConf,

Seconds(10))windowDuration(每次移動,窗口框住的長度(幾個批次))長控制每次計算最近的多少個批次的數(shù)據(jù)(windowDuration/batchDuration)slideDuration(每次移動的距離(

幾個批次))默認值與batchDuration相等(默認滑動一個batch)控制多長時間計算一次有狀態(tài)轉(zhuǎn)換操作2-window操作代碼片段val

ssc

=

new

StreamingContext(sparkConf,

Seconds(10))…val

accessLogsWindow=

accessLogsDStream.window(Seconds(30),

Seconds(20))val

windowCounts

=

accessLogsWindow.count()..窗口時長為3個批次,滑動步長為2個批次;每隔2個批次就對前3

個批次的數(shù)據(jù)進行一次計算有狀態(tài)轉(zhuǎn)換操作2-window操作—普通規(guī)約與增量規(guī)約增量規(guī)約只考慮新進入窗口的數(shù)據(jù)和離開窗口的數(shù)據(jù),讓Spark增量計算歸約結(jié)果。這種特殊形式需要提供歸約函數(shù)的一個逆函數(shù),比如+對應(yīng)的逆函數(shù)為-有狀態(tài)轉(zhuǎn)換操作2-window操作—理解增量規(guī)約DStream輸出常見輸出操作print每個批次中抓取DStream

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論