




版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、Spark4.sparkStreaming 和 storm 的區(qū)別1. spark 運(yùn)行的 job 在哪里可以看到Spark 優(yōu)化flume:日志收集系統(tǒng),主要用于系統(tǒng)日志的收集kafka:消息隊(duì)列,進(jìn)行消息的緩存和系統(tǒng)的解耦storm:實(shí)時(shí)計(jì)算框架,進(jìn)行流式的計(jì)算。Spark 應(yīng)用轉(zhuǎn)換流程1、spark 應(yīng)用提交后,經(jīng)歷了一系列的轉(zhuǎn)換,最后成為task 在每個(gè)節(jié)點(diǎn)上執(zhí)行2、RDD 的 Action 算子觸發(fā) Job 的提交,生成 RDD DAG3、由 DAGScheduler 將 RDD DAG 轉(zhuǎn)化為 Stage DAG,每個(gè) Stage 中產(chǎn)生相應(yīng)的 Task 集合4、TaskSched
2、uler 將任務(wù)分發(fā)到 Executor 執(zhí)行5、每個(gè)任務(wù)對(duì)應(yīng)相應(yīng)的一個(gè)數(shù)據(jù)塊,只用用戶(hù)定義的函數(shù)處理數(shù)據(jù)塊Driver 運(yùn)行在Worker 上通過(guò).apache.spark.deploy.C nt 類(lèi)執(zhí)行作業(yè),作業(yè)運(yùn)行命令如下:作業(yè)執(zhí)行流程描述:1、客戶(hù)端提交作業(yè)給Master2、Master 讓一個(gè)Worker 啟動(dòng)Driver,即 SchedulerBackend。Worker 創(chuàng)建一個(gè)DriverRunner 線(xiàn)程,DriverRunner啟動(dòng)SchedulerBackend 進(jìn)程。3、另外Master 還會(huì)讓其余Worker 啟動(dòng)Exeuctor,即 ExecutorBackend。
3、Worker 創(chuàng)建一個(gè)ExecutorRunner線(xiàn)程,ExecutorRunner 會(huì)啟動(dòng)ExecutorBackend 進(jìn)程。4、ExecutorBackend 啟動(dòng)后會(huì)向Driver 的SchedulerBackend 。SchedulerBackend 進(jìn)程中包含DAGScheduler,它會(huì)根據(jù)用戶(hù)程序,生成執(zhí)行計(jì)劃,并調(diào)度執(zhí)行。對(duì)于每個(gè)stage 的task,都會(huì)被存放到TaskScheduler 中, ExecutorBackend 向SchedulerBackend 匯報(bào)的時(shí)候把TaskScheduler 中的task 調(diào)度到ExecutorBackend 執(zhí)行。5、所有sta
4、ge 都完成后作業(yè)結(jié)束。Driver 運(yùn)行在客戶(hù)端# spark 面試問(wèn)題收集# spark 面試問(wèn)題# 1、spark 中的 RDD 是什么,有哪些特性*RDD(Resint Distributed Dataset)叫做分布式數(shù)據(jù)集,是 Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。*Dataset:就是一個(gè)集合,用于存放數(shù)據(jù)的*Distributed:分布式,可以并行在集群計(jì)算*Resint:表示彈性的作業(yè)執(zhí)行流程描述:1、客戶(hù)端啟動(dòng)后直接運(yùn)行用戶(hù)程序,啟動(dòng)Driver 相關(guān)的工作:DAGScheduler 和BlockManagerMaster 等。
5、2、客戶(hù)端的Driver 向Master。3、Master 還會(huì)讓W(xué)orker 啟動(dòng)Exeuctor。Worker 創(chuàng)建一個(gè)ExecutorRunner 線(xiàn)程,ExecutorRunner 會(huì)啟動(dòng)ExecutorBackend 進(jìn)程。4、ExecutorBackend 啟動(dòng)后會(huì)向Driver 的SchedulerBackend。Driver 的DAGScheduler作業(yè)并生成相應(yīng)的Stage,每個(gè)Stage 包含的Task 通過(guò)TaskScheduler 分配給Executor 執(zhí)行。5、所有stage 都完成后作業(yè)結(jié)束。* 彈性表示* 1、RDD 中的數(shù)據(jù)可以在內(nèi)存或者是磁盤(pán)* 2、RDD
6、 中的分區(qū)是可以改變的*五大特性:*Alist of partitions一個(gè)分區(qū)列表,RDD 中的數(shù)據(jù)都存在一個(gè)分區(qū)列表里面*Afunction for computing each split作用在每一個(gè)分區(qū)中的函數(shù)*Alist of dependencies on other RDDs一個(gè) RDD 依賴(lài)于其他多個(gè) RDD,這個(gè)點(diǎn)很重要,RDD 的容錯(cuò)機(jī)制就是依據(jù)這個(gè)特性而來(lái)的* Optionally, a Partitioner for key-value RDDs (e.g. to saythe RDD is hash-partitioned)t可選的,針對(duì)于 kv 類(lèi)型的 RDD 才
7、具有這個(gè)特性,作用是決定了數(shù)據(jù)的來(lái)源以及數(shù)據(jù)處理后的去向* Optionally, a list of preferred locations to compute each spliton (e.g. block locations for an HDFS file)可選項(xiàng),數(shù)據(jù)本地性,數(shù)據(jù)位置最優(yōu)# 2、概述一下 spark 中的常用算子區(qū)別(map、mapPartitions、foreach、foreachPartition)* map:用于遍歷 RDD,將函數(shù) f 應(yīng)用于每一個(gè)元素,返回新的 RDD(transformation算子)。* foreach:用于遍歷 RDD,將函數(shù) f
8、應(yīng)用于每一個(gè)元素,無(wú)返回值(action 算子)。* mapPartitions:用于遍歷操作 RDD 中的每一個(gè)分區(qū),返回生成一個(gè)新的 RDD(transformation 算子)。* foreachPartition: 用于遍歷操作RDD 中的每一個(gè)分區(qū)。無(wú)返回值(action 算子)。* 總結(jié):一般使用 mapPartitions 或者 foreachPartition 算子比 map 和 foreach更加高效,使用。# 3、談?wù)?spark 中的寬窄依賴(lài)* RDD 和它依賴(lài)的父 RDD(s)的關(guān)系有兩種不同的類(lèi)型,即窄依賴(lài)(narrowdependency)和寬依賴(lài)(wide dep
9、endency)。* 寬依賴(lài):指的是多個(gè)子 RDD 的 Partition 會(huì)依賴(lài)同一個(gè)父 RDD 的 Partition* 窄依賴(lài):指的是每一個(gè)父 RDD 的 Partition 最多被子 RDD 的一個(gè) Partition使用。# 4、spark 中如何劃分 stage* 1.Spark Application 中可以因?yàn)椴煌?Action 觸發(fā)眾多的 job,一個(gè)Application 中可以有很多的 job,每個(gè) job 是由一個(gè)或者多個(gè) Stage的,后面的 Stage 依賴(lài)于前面的 Stage,也就是說(shuō)只有前面依賴(lài)的 Stage 計(jì)算完畢后,后面的 Stage 才會(huì)運(yùn)行。* 2.
10、Stage 劃分的依據(jù)就是寬依賴(lài),何時(shí)產(chǎn)生寬依賴(lài),例如reduceByKey,groupByKey 的算子,會(huì)導(dǎo)致寬依賴(lài)的產(chǎn)生。* 3.由 Action(例如 collect)導(dǎo)致了 SparkContext.runJob 的執(zhí)行,最終導(dǎo)致了 DAGScheduler 中的 submitJob 的執(zhí)行,其是通過(guò)發(fā)送一個(gè) case classJobSubmitted 對(duì)象給 eventProsLoop。eventProsLoop 是 DAGSchedulerEventProsLoop 的具體實(shí)例,而DAGSchedulerEventProsLoop 是 eventLoop 的子類(lèi),具體實(shí)現(xiàn) Ev
11、entLoop 的onReceive 方法,onReceive 方法轉(zhuǎn)過(guò)來(lái)回調(diào)doOnReceive* 4.在 doOnReceive 中通過(guò)模式匹配的方法把執(zhí)行路由到* 5.在 handleJobSubmitted 中首先創(chuàng)建 finalStage,創(chuàng)建 finalStage 時(shí)候會(huì)建立父 Stage 的依賴(lài)鏈條* 總結(jié):以來(lái)是從代碼的邏輯層面上來(lái)展開(kāi)說(shuō)的,可以簡(jiǎn)單點(diǎn)說(shuō):寫(xiě)介紹什么是 RDD 中的寬窄依賴(lài),然后在根據(jù) DAG 有向無(wú)環(huán)圖進(jìn)行劃分,從當(dāng)前 job 的最后一個(gè)算子往前推,遇到寬依賴(lài),那么當(dāng)前在這個(gè)批次中的所有算子操作都劃分成一個(gè) stage,然后繼續(xù)按照這種方式在繼續(xù)往前推,如在
12、遇到寬依賴(lài),又劃分成一個(gè) stage,一直到最前面的一個(gè)算子。最后整個(gè) job 會(huì)被劃分成多個(gè) stage,而stage 之間又存在依賴(lài)關(guān)系,后面的 stage 依賴(lài)于前面的 stage。# 5、spark-submit 的時(shí)候如何引入外部 jar 包* 在通過(guò) spark-submit 提交任務(wù)時(shí),可以通過(guò)添加配置參數(shù)來(lái)指定* -driver-class-path 外部 jar 包* -jars 外部 jar 包# 6、spark 如何防止內(nèi)存溢出* driver 端的內(nèi)存溢出* 可以增大 driver 的內(nèi)存參數(shù):spark.driver.memory (default 1g)* 這個(gè)參數(shù)
13、用來(lái)設(shè)置 Driver 的內(nèi)存。在 Spark 程序中,SparkContext, DAGScheduler 都是運(yùn)行在 Driver 端的。對(duì)應(yīng) rdd 的 Stage 切分也是在 Driver 端運(yùn)行,如果用戶(hù)自己寫(xiě)的程序有過(guò)多的步驟,切分出過(guò)多的 Stage,這部分信息消耗的是 Driver 的內(nèi)存,這個(gè)時(shí)候就需要調(diào)大 Driver 的內(nèi)存。* map 過(guò)程產(chǎn)生大量對(duì)象導(dǎo)致內(nèi)存溢出* 這種溢出的原因是在單個(gè) map 中產(chǎn)生了大量的對(duì)象導(dǎo)致的,例如: rdd.map(x=for(i for(i (k,1).reduceBykey(_+_).map(k=(k._2,k._1).sortByK
14、ey(false).take(10)* 如果發(fā)現(xiàn)多數(shù)數(shù)據(jù)分布都較為平均,而個(gè)別數(shù)據(jù)比其他數(shù)據(jù)大上若干個(gè)數(shù)量級(jí),則說(shuō)明發(fā)生了數(shù)據(jù)傾斜。* 經(jīng)過(guò)分析,傾斜的數(shù)據(jù)主要有以下三種情況:* 1、null(空值)或是一些無(wú)意義的信息()之類(lèi)的,大多是這個(gè)原因引起。* 2、無(wú)效數(shù)據(jù),大量重復(fù)的測(cè)試數(shù)據(jù)或是對(duì)結(jié)果影響不大的有效數(shù)據(jù)。* 3、有效數(shù)據(jù),業(yè)務(wù)導(dǎo)致的正常數(shù)據(jù)分布。* 解決辦法* 第 1,2 種情況,直接對(duì)數(shù)據(jù)進(jìn)行過(guò)濾即可(因?yàn)樵摂?shù)據(jù)對(duì)當(dāng)前業(yè)務(wù)不會(huì)產(chǎn)生影響)。* 第 3 種情況則需要進(jìn)行一些特殊操作,常見(jiàn)的有以下幾種做法* (1)執(zhí)行,將異常的 key 過(guò)濾出來(lái)單獨(dú)處理,最后與正常數(shù)據(jù)的處理結(jié)果進(jìn)行
15、union 操作。* (2) 對(duì) key 先添加隨機(jī)值,進(jìn)行操作后,去掉隨機(jī)值,再進(jìn)行一次操作。* (3) 使用 reduceByKey 代替 groupByKey(reduceByKey 用于對(duì)每個(gè) key 對(duì)應(yīng)的多個(gè) value 進(jìn)行 merge 操作,最重要的是它能夠在本地先進(jìn)行 merge 操作,并且 merge 操作可以通過(guò)函數(shù)自定義.)* (4) 使用 map join。* 案例* 如果使用 reduceByKey 因?yàn)閿?shù)據(jù)傾斜造成運(yùn)行失敗的問(wèn)題。具體操作流程如下:*(1)將原始的 key 轉(zhuǎn)化為 key + 隨機(jī)值(例如 Random.next)*(2)對(duì)數(shù)據(jù)進(jìn)行 reduceB
16、yKey(func)*(3)將 key + 隨機(jī)值 轉(zhuǎn)成 key*(4)再對(duì)數(shù)據(jù)進(jìn)行 reduceByKey(func)* 案例操作流程分析:* 假設(shè)說(shuō)有傾斜的 Key,給所有的 Key 加上一個(gè)隨機(jī)數(shù),然后進(jìn)行 reduceByKey 操作;此時(shí)同一個(gè) Key 會(huì)有不同的隨機(jī)數(shù)前綴,在進(jìn)行reduceByKey 操作的時(shí)候原來(lái)的一個(gè)非常大的傾斜的Key 就分而治之變成若干個(gè)更小的 Key,不過(guò)此時(shí)結(jié)果和原來(lái)不一樣,怎么破?進(jìn)行 map 操作,目的是把隨機(jī)數(shù)前綴去掉,然后再次進(jìn)行 reduceByKey 操作。(當(dāng)然,如果你很無(wú)聊,可以再次做隨機(jī)數(shù)前綴),這樣就可以把原本傾斜的 Key 通過(guò)分
17、而治之方案分散開(kāi)來(lái),最后又進(jìn)行了全局聚合* 注意 1: 如果此時(shí)依舊存在問(wèn)題,建議篩選出傾斜的數(shù)據(jù)單獨(dú)處理。最后將這份數(shù)據(jù)與正常的數(shù)據(jù)進(jìn)行 union 即可。* 注意 2: 單獨(dú)處理異常數(shù)據(jù)時(shí),可以配合使用 MapJoin 解決。* 2、spark 使用不當(dāng)造成的數(shù)據(jù)傾斜* 提高 shuffle 并行度* dataFrame 和 sparkSql 可以設(shè)置spark.sql.shuffle.partitions 參數(shù)控制 shuffle 的并發(fā)度,默認(rèn)為 200。* rdd 操作可以設(shè)置 spark.default.parallelism 控制并發(fā)度,默認(rèn)參數(shù)由不同的 Cluster Mana
18、ger 控制。* 局限性: 只是讓每個(gè) task 執(zhí)行更少的不同的 key。無(wú)法解決個(gè)別 key 特別大的情況造成的傾斜,如果某些 key 的大小非常大,即使一個(gè) task 單獨(dú)執(zhí)行它,也會(huì)受到數(shù)據(jù)傾斜的困擾。* 使用 map join 代替 reduce join* 在小表不是特別大(取決于你的 executor 大小)的情況下使用,可以使程序避免 shuffle 的過(guò)程,自然也就沒(méi)有數(shù)據(jù)傾斜的困擾了.(詳細(xì)見(jiàn)、)* 局限性: 因?yàn)槭窍葘⑿?shù)據(jù)發(fā)送到每個(gè) executor上,所以數(shù)據(jù)量不能太大。# 11、flume 整合 sparkStreaming 問(wèn)題* (1)、如何實(shí)現(xiàn) sparkSt
19、reamingflume 中的數(shù)據(jù)* 可以這樣說(shuō):* 前期經(jīng)過(guò)技術(shù)調(diào)研,查看官網(wǎng)相關(guān)資料,發(fā)現(xiàn) sparkStreaming 整合 flume 有 2 種模式,一種是拉模式,一種是推模式,然后在簡(jiǎn)單的聊聊這 2 種模式的特點(diǎn),以及如何部署實(shí)現(xiàn),需要做哪些事情,最后對(duì)比兩種模式的特點(diǎn),選擇那種模式更好。* 推模式:Flume 將數(shù)據(jù) Push 推給 Spark Streaming* 拉模式:Spark Streaming 從 flume 中 Poll 拉取數(shù)據(jù)* (2)、在實(shí)際開(kāi)發(fā)的時(shí)候是如何保證數(shù)據(jù)不丟失的* 可以這樣說(shuō):* flume 那邊采用的 channel 是將數(shù)據(jù)落地到磁盤(pán)中,保證數(shù)
20、據(jù)源端安全性(可以在補(bǔ)充一下,flume 在這里的 channel 可以設(shè)置為 memory內(nèi)存中,提高數(shù)據(jù)接收處理的效率,但是由于數(shù)據(jù)在內(nèi)存中,安全機(jī)制保證不了,故選擇 channel 為磁盤(pán)。整個(gè)流程運(yùn)行有一點(diǎn)的延遲性)* sparkStreaming 通過(guò)拉模式整合的時(shí)候,使用了 FlumeUtils這樣一個(gè)類(lèi),該類(lèi)是需要依賴(lài)一個(gè)額外的 jar 包(spark-streaming-flume_2.10)* 要想保證數(shù)據(jù)不丟失,數(shù)據(jù)的準(zhǔn)確性,可以在構(gòu)建StreamingConext 的時(shí)候,利用 StreamingContext.getOrCreate(checkpo,creatingFu
21、nc: () = StreamingContext)來(lái)創(chuàng)建一個(gè) StreamingContext,使用StreamingContext.getOrCreate 來(lái)創(chuàng)建 StreamingContext 對(duì)象,傳入的第一個(gè)參數(shù)是 checkpo的存放目錄,第二參數(shù)是生成 StreamingContext 對(duì)象的用戶(hù)自定義函數(shù)。如果 checkpo的存放目錄存在,則從這個(gè)目錄中生成StreamingContext 對(duì)象;如果不存在,才會(huì)調(diào)用第二個(gè)函數(shù)來(lái)生成新的StreamingContext 對(duì)象。在 creatingFunc 函數(shù)中,除了生成一個(gè)新的StreamingContext 操作,還需要
22、完成,然后調(diào)用ssc.checkpo(checkpoDirectory)來(lái)初始化 checkpo功能,最后再返回StreamingContext 對(duì)象。這樣,在 StreamingContext.getOrCreate 之后,就可以直接調(diào)用start()函數(shù)來(lái)啟動(dòng)(或者是從中斷點(diǎn)繼續(xù)運(yùn)行)流式應(yīng)用了。如果有其他在啟動(dòng)或繼續(xù)運(yùn)行都要做的工作,可以在 start()調(diào)用前執(zhí)行。* 流失計(jì)算中使用 checkpo的作用:*保存元數(shù)據(jù),包括流式應(yīng)用的配置、流式?jīng)]之前定義的、未完成所有操作的 batch。元數(shù)據(jù)被到失敗的存儲(chǔ)系統(tǒng)上,如 HDFS。這種 ckeckpo主要針對(duì) driver 失敗后的修復(fù)。
23、*上,如 HDFS。這種 ckeckpo保存流式數(shù)據(jù),也是到失敗的系統(tǒng)主要針對(duì) window operation、有狀態(tài)的操作。無(wú)論是 driver 失敗了,還是 worker 失敗了,這種 checkpo都?jí)蚩焖倩謴?fù),而不需要將很長(zhǎng)的歷史數(shù)據(jù)都重新計(jì)算一遍(以便得到當(dāng)前的狀態(tài))。* 設(shè)置流式數(shù)據(jù) checkpo的周期* 對(duì)于一個(gè)需要做 checkpo的 DStream 結(jié)構(gòu),可以通過(guò)調(diào)用 DStream.checkpo(checkpoerval)來(lái)設(shè)置 ckeckpo的周期,經(jīng)驗(yàn)上一般將這個(gè) checkpo周期設(shè)置成 batch 周期的 5 至 10 倍。* 使用 write ahead l
24、ogs 功能* 這是一個(gè)可選功能,建議加上。這個(gè)功能將使得輸入數(shù)據(jù)寫(xiě)入之前配置的 checkpo目錄。這樣有狀態(tài)的數(shù)據(jù)可以從上一個(gè)checkpo開(kāi)始計(jì)算。開(kāi)啟的方法是把spark.streaming.receiver.writeAheadLogs.enable 這個(gè) property 設(shè)置為 true。另外,由于輸入 RDD 的默認(rèn) StorageLevel 是 MEMORY_AND_DISK_2,即數(shù)據(jù)會(huì)在兩臺(tái) worker 上做 replication。實(shí)際上,Spark Streaming 模式下,任何從網(wǎng)絡(luò)輸入數(shù)據(jù)的 Receiver(如 kafka、flume、socket)都會(huì)在兩
25、臺(tái)機(jī)器上做數(shù)據(jù)備份。如果開(kāi)啟了 write ahead logs 的功能,建議把 StorageLevel 改成MEMORY_AND_DISK_SER。修改的方法是,在創(chuàng)建 RDD 時(shí)由參數(shù)傳入。* 使用以上的 checkpo機(jī)制,確實(shí)可以保證數(shù)據(jù) 0 丟失。但是一個(gè)前提條件是,數(shù)據(jù)發(fā)送端必須要有緩存功能,這樣才能保證在 spark應(yīng)用重啟期間,數(shù)據(jù)發(fā)送端不會(huì)因?yàn)?spark streaming 服務(wù)不可用而把數(shù)據(jù)丟棄。而flume 具備這種特性,同樣 kafka 也具備。* (3)Spark Streaming 的數(shù)據(jù)可靠性* 有了 checkpo機(jī)制、write ahead log 機(jī)制、
26、Receiver 緩存機(jī)器、可靠的 Receiver(即數(shù)據(jù)接收并備份成功后會(huì)發(fā)送 ack),可以保證無(wú)論是 worker失效還是 driver 失效,都是數(shù)據(jù) 0 丟失。原因是:如果沒(méi)有 Receiver 服務(wù)的 worker 失效了,RDD 數(shù)據(jù)可以依賴(lài)血統(tǒng)來(lái)重新計(jì)算;如果 Receiver 所在 worker失敗了,由于 Reciever 是可靠的,并有 write ahead log 機(jī)制,則收到的數(shù)據(jù)可以保證不丟;如果 driver 失敗了,可以從checkpo中恢復(fù)數(shù)據(jù)重新構(gòu)建。# 12、kafka 整合 sparkStreaming 問(wèn)題* (1)、如何實(shí)現(xiàn) sparkStrea
27、mingkafka 中的數(shù)據(jù)* 可以這樣說(shuō):在 kafka0.10 版本之前有二種方式與 sparkStreaming整合,一種是基于 receiver,一種是 direct,然后分別闡述這 2 種方式分別是什么* receiver:是采用了 kafka 高級(jí) api,利用 receiver來(lái)接受 kafka topic 中的數(shù)據(jù),從 kafka 接收來(lái)的數(shù)據(jù)會(huì)在 spark 的executor 中,之后 spark streaming 提交的 job 會(huì)處理這些數(shù)據(jù),kafka 中 topic的偏移量是保存在 zk 中的。* 基本使用: val kafkaStream =KafkaUtils
28、.createStream(streamingContext,ZK quorum, consumer groupartitions to consume), per-topic number of Kafka* 還有幾個(gè)需要注意的點(diǎn):* 在 Receiver 的方式中,Spark 中的partition 和 kafka 中的 partition 并不是相關(guān)的,所以如果加大每個(gè) topic的 partition 數(shù)量,僅僅是增加線(xiàn)程來(lái)處理由單一 Receiver 消費(fèi)的。但是這并沒(méi)有增加 Spark 在處理數(shù)據(jù)上的并行度.* 對(duì)于不同的 Group 和 topic可以使用多個(gè) Receiver
29、創(chuàng)建不同的 Dstream 來(lái)并行接收數(shù)據(jù),之后可以利用 union 來(lái)統(tǒng)一成一個(gè) Dstream。* 在默認(rèn)配置下,這種方式可能會(huì)因?yàn)榈讓佣鴣G失數(shù)據(jù). 因?yàn)?receiver 一直在接收數(shù)據(jù),在其已經(jīng)通知 zookeeper數(shù)據(jù)接收完成但是還沒(méi)有處理的時(shí)候,executor 突然掛掉(或是 driver 掛掉通知executor 關(guān)閉),緩存在其中的數(shù)據(jù)就會(huì)丟失. 如果希望做到高可靠, 讓數(shù)據(jù)零丟失,如果啟用了 Write AheadLogs(spark.streaming.receiver.writeAheadLog.enable=true)該機(jī)制會(huì)同步地將接收到的 Kafka 數(shù)據(jù)寫(xiě)入分
30、布式文件系統(tǒng)(比如 HDFS)上的預(yù)寫(xiě)日志中. 所以, 即使底層節(jié)點(diǎn)出現(xiàn)了失敗, 也可以使用預(yù)寫(xiě)日志中的數(shù)據(jù)進(jìn)行恢復(fù).到文件系統(tǒng)如 HDFS,那么 storage level 需要設(shè)置成StorageLevel.MEMORY_AND_DISK_SER,也就是 KafkaUtils.createStream(., StorageLevel.MEMORY_AND_DISK_SER)* direct:在 spark1.3 之后,引入了 Direct 方式。不同于Receiver 的方式,Direct 方式?jīng)]有 receiver 這一層,其會(huì)周期性的獲取 Kafka中每個(gè) topic 的每個(gè) part
31、ition 中的最新 offsets,之后根據(jù)設(shè)定的maxRatePartition 來(lái)處理每個(gè) batch。(設(shè)置spark.streaming.kafaxRatePartition=10000。限制每秒鐘從 topic 的每個(gè) partition 最多消費(fèi)的消息條數(shù))。* (2) 對(duì)比這 2 中方式的優(yōu)缺點(diǎn):* 采用 receiver 方式:這種方式可以保證數(shù)據(jù)不丟失,但是無(wú)法保證數(shù)據(jù)只被處理一次,WAL 實(shí)現(xiàn)的是east-once 語(yǔ)義(至少被處理一次),如果在寫(xiě)入到外部的數(shù)據(jù)還沒(méi)有將offset 更新到zookeeper 就掛掉,這些數(shù)據(jù)將會(huì)被反復(fù)消費(fèi). 同時(shí),降低了程序的吞吐量。*
32、采用 direct 方式:相比 Receiver 模式而言能夠確保機(jī)制更加健壯.區(qū)別于使用 Receiver 來(lái)接收數(shù)據(jù), Direct 模式會(huì)周期性動(dòng)查詢(xún) Kafka,來(lái)獲得每個(gè) topic+partition 的最新的 offset, 從而定義每個(gè) batch 的 offset的范圍. 當(dāng)處理數(shù)據(jù)的 job 啟動(dòng)時(shí), 就會(huì)使用 Kafka 的簡(jiǎn)單 consumer api 來(lái)獲取 Kafka 指定 offset 范圍的數(shù)據(jù)。* 優(yōu)點(diǎn):* 1、簡(jiǎn)化并行* 如果要多個(gè) partition, 不需要?jiǎng)?chuàng)建多個(gè)輸入 DStream 然后對(duì)它們進(jìn)行 union 操作. Spark 會(huì)創(chuàng)建跟 Kafka
33、 partition一樣多的 RDD partition, 并且會(huì)并行從 Kafka 中partition 和 RDD partition 之間, 有一個(gè)一對(duì)一的數(shù)據(jù). 所以在 Kafka關(guān)系.* 2、高性能* 如果要保證零數(shù)據(jù)丟失, 在基于 receiver的方式中, 需要開(kāi)啟 WAL 機(jī)制. 這種方式其實(shí)效率低下, 因?yàn)閿?shù)據(jù)實(shí)際上被復(fù)制了兩份, Kafka 自己本身就有高可靠的機(jī)制, 會(huì)對(duì)數(shù)據(jù)一份, 而這里又會(huì)一份到 WAL 中. 而基于 direct 的方式, 不依賴(lài) Receiver, 不需要開(kāi)啟 WAL機(jī)制, 只要 Kafka 中作了數(shù)據(jù)的, 那么就可以通過(guò) Kafka 的副本進(jìn)行恢
34、復(fù).* 3、一次且僅一次的事務(wù)機(jī)制* 基于 receiver 的方式, 是使用 Kafka 的高階 API 來(lái)在 ZooKeeper 中保存消費(fèi)過(guò)的 offset 的. 這是消費(fèi) Kafka 數(shù)據(jù)的傳統(tǒng)方式. 這種方式配合著 WAL 機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性, 但是卻無(wú)法保證數(shù)據(jù)被處理一次且僅一次, 可能會(huì)處理兩次. 因?yàn)镾park 和ZooKeeper 之間可能是不同步的. 基于 direct 的方式, 使用kafka 的簡(jiǎn)單 api, SparkStreaming 自己就負(fù)責(zé)追蹤消費(fèi)的 offset, 并保存在 checkpo中. Spark 自己一定是同步的, 因此可以保證數(shù)據(jù)是
35、消費(fèi)一次且僅消費(fèi)一次。不過(guò)需要自己完成將 offset 寫(xiě)入zk 的過(guò)程,在文檔中都有相應(yīng)介紹.*簡(jiǎn)單代碼實(shí)例:*messages.foreachRDD(rdd=val message =rdd.map(_._2)/對(duì)數(shù)據(jù)進(jìn)行一些操作message.map(method)/更新 zk 上的 offset (自己實(shí)現(xiàn))updateZKOffsets(rdd)* sparkStreaming 程序自己消費(fèi)完成后,自己主動(dòng)去更新 zk 上面的偏移量。也可以將 zk 中的偏移量保存在或者 redis 數(shù)據(jù)庫(kù)中,下次重啟的時(shí)候,直接或者 redis中的偏移量,獲取到上次消費(fèi)的偏移量,接著數(shù)據(jù)。# 13、利用 scala 語(yǔ)言實(shí)現(xiàn)排序* (1)冒泡排序:*package cn.sort*/冒泡排序*class BubbleSort *def main(args: ArrayString): Unit = *val list = List(3, 12, 43, 23, 7, 1, 2, 0)*prln(sort(list)*/定義一
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五年度臨時(shí)保安服務(wù)合同-活動(dòng)期間安全保衛(wèi)
- 2025年度知識(shí)產(chǎn)權(quán)質(zhì)押合同終止及質(zhì)權(quán)實(shí)現(xiàn)協(xié)議
- 產(chǎn)品發(fā)布營(yíng)銷(xiāo)策略規(guī)劃
- 影視制作發(fā)行合作框架協(xié)議
- 智能能源管理系統(tǒng)建設(shè)投資協(xié)議
- 家具企業(yè)家具設(shè)計(jì)與制造預(yù)案
- 哈他瑜伽介紹課件:哈他瑜伽-身心平衡的藝術(shù)
- 小學(xué)生心理輔導(dǎo)觀后感
- 網(wǎng)絡(luò)購(gòu)物平臺(tái)合作運(yùn)營(yíng)協(xié)議書(shū)
- 娛樂(lè)項(xiàng)目節(jié)目制作授權(quán)協(xié)議
- 第1章 跨境電商概述
- 2024-2030年中國(guó)長(zhǎng)管拖車(chē)行業(yè)市場(chǎng)發(fā)展趨勢(shì)與前景展望戰(zhàn)略分析報(bào)告
- 《高等教育學(xué)》近年考試真題題庫(kù)(含答案)
- 2024福建省廈門(mén)市總工會(huì)擬錄用人員筆試歷年典型考題及考點(diǎn)剖析附答案帶詳解
- 2024風(fēng)力發(fā)電機(jī)組預(yù)應(yīng)力基礎(chǔ)錨栓籠組合件技術(shù)規(guī)范
- 供熱管道施工組織設(shè)計(jì)
- 浙江省中小學(xué)心理健康教育教師上崗資格證書(shū)管理辦法(修訂)
- 2024年青島港灣職業(yè)技術(shù)學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性測(cè)試題庫(kù)審定版
- 2024年時(shí)事政治題(考點(diǎn)梳理)
- 2023全國(guó)乙卷語(yǔ)文真題試卷及答案解析
- JavaWeb程序設(shè)計(jì) 教案 第1章 JavaWeb開(kāi)發(fā)環(huán)境配置-2學(xué)時(shí)
評(píng)論
0/150
提交評(píng)論