第16周flink快速上手篇it資源附件162學(xué)習(xí)預(yù)覽_第1頁(yè)
第16周flink快速上手篇it資源附件162學(xué)習(xí)預(yù)覽_第2頁(yè)
第16周flink快速上手篇it資源附件162學(xué)習(xí)預(yù)覽_第3頁(yè)
第16周flink快速上手篇it資源附件162學(xué)習(xí)預(yù)覽_第4頁(yè)
第16周flink快速上手篇it資源附件162學(xué)習(xí)預(yù)覽_第5頁(yè)
已閱讀5頁(yè),還剩24頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1:Flinkermr個(gè)特定的時(shí)間后,必須觸發(fā)Window去進(jìn)行計(jì)算了,inorderoutoforder:Watermark的生成方式Watermark的生成方式有兩種 WithPeriodic每隔N秒自動(dòng)向流里面注入一個(gè)Watermark,時(shí)間間隔由ExecutionConfig.setAutoWatermarkIntervalFlink200ms。之前默100ms WithPunctuatedsocket模擬產(chǎn)生數(shù)據(jù),數(shù)據(jù)的格式為:0001,1790820682000其中1790820682000是數(shù)據(jù)產(chǎn)生的時(shí)間,也就是EventTimeWindow打印信息來(lái)驗(yàn)證Window被觸發(fā)的時(shí)機(jī)scalapackagepackageimportimportimportimportimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.streaming.api.scala.function.WindowFunctionimportorg.apache.flink.streaming.api.windowing.time.Timeimportimportimportscala.collection.mutable.ArrayBufferimportscala.util.SortingWatermark+EventTimeCreatedbyobjectWatermarkOpScaladefmain(args:Array[String]):Unit=valenv=//1importorg.apache.flink.api.scala._//tuple2valtupStream=text.map(line=>{valarr=line.split(",")(arr(0), uration.ofSeconds(10))//最大允許的數(shù)據(jù)亂序時(shí)間{valsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss")varcurrentMaxTimestamp=0LoverridedefextractTimestamp(element:(String,Long),recordTimestamp:Long):=valtimestamp=currentMaxTimestamp=//計(jì)算當(dāng)前的watermarkvalcurrentWatermark=currentMaxTimestamp-//print}).apply(newWindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow]overridedefapply(key:Tuple,window:TimeWindow,input:Ible[(String,Long)],out:Collector[String]):Unit={valkeyStr=//windowarrBuffvalarrBuff=ArrayBuffer[Long]()//arrBuffarrvalarr//arrvalsdf=newSimpleDateFormat("yyyy-MM-dd }}}[root@bigdata04soft]#nc-l900110:11:22],watermark:[1790820672000|2026-10-0110:11:12]EventEvent2026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:26],watermark:[1790820676000|2026-10-0110:11:16]EventEvent2026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:32],watermark:[1790820682000|2026-10-0110:11:22]2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:33],watermark:[1790820683000|2026-10-0110:11:23]EventEvent2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01來(lái)算,最早的數(shù)據(jù)已經(jīng)過(guò)去了11s了,Window還沒(méi)有開始計(jì)算,那到底什么時(shí)候會(huì)觸發(fā)Window呢?[root@bigdata04soft]#nc-l900010:11:34],watermark:[1790820684000|2026-10-0110:11:24](0001),1,2026-10-0110:11:22,2026-10-0110:11:22,2026-10-0110:11:21,2026-10-01Event數(shù)據(jù),則當(dāng)Watermark時(shí)間>=EventTime時(shí),就符合了Window觸發(fā)的條件了,最終決定Window觸發(fā),還是由數(shù)據(jù)本身的EventTime所屬Windowwindow_end_time決定。早的一條記錄所在Window的window_end_time,所以Window就被觸發(fā)了。[root@bigdata04soft]#nc-l900010:11:36],watermark:[1790820686000|2026-10-0110:11:26] Event

此時(shí),Watermark時(shí)間雖然已經(jīng)等于第二條數(shù)據(jù)的時(shí)間,但是由于其沒(méi)有達(dá)到第二條數(shù)據(jù)所WindowWindowWindow時(shí)[root@bigdata04soft]#nc-l900010:11:37],watermark:[1790820687000|2026-10-0110:11:27](0001),1,2026-10-0110:11:26,2026-10-0110:11:26,2026-10-0110:11:24,2026-10-01Event1:Watermarkwindow_end_time2:在[inow_strt_tie,indow_nd_tim)區(qū)間中有數(shù)據(jù)存在(注意是左閉右開的區(qū)間)。同時(shí)滿足了以上2個(gè)條件,Window才會(huì)觸發(fā)。:+EentTimWatermarkEventTime機(jī)制,是如何處理亂序數(shù)據(jù)的。[root@hadoop100soft]#nc-l10:11:39],watermark:[1790820689000|2026-10-0110:11:29]10:11:39],watermark:[1790820689000|2026-10-0110:11:29]EventEvent10:11:31currentMaxTimestamp1:watermark時(shí)間>=window_end_timeWatermark時(shí)間(10:11:29)<window_end_time(10:11:33),WindowWindow一定就會(huì)觸發(fā)了,我們?cè)囈辉?,繼續(xù)輸入內(nèi)容。Event2個(gè)數(shù)據(jù),10:11:3110:11:3210:11:33的數(shù)據(jù),上邊的結(jié)果,已經(jīng)表明,對(duì)于的數(shù)據(jù),F(xiàn)link可以通過(guò)Watermark來(lái)實(shí)現(xiàn)處理一定范圍內(nèi)的亂序數(shù)據(jù)。那么對(duì)于“(lateelement)”太久的數(shù)據(jù),F(xiàn)link是怎么處理的呢?:LateElement(我們輸入一個(gè)亂序很多的(EventTimeWatermark時(shí)間)數(shù)據(jù)來(lái)測(cè)試下:輸入2行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01Eventwatermark2026-10-01輸入3行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]2:allowedLatenessFlink提供了allowedLateness方法可以實(shí)現(xiàn)對(duì)的數(shù)據(jù)設(shè)置一個(gè)延遲時(shí)間,在指定延遲時(shí)間內(nèi)到達(dá)的數(shù)據(jù)還是可以觸發(fā)window執(zhí)行的。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-01(0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01EventWatermark2026-10-0110:11:33EventTime<Watermark的數(shù)據(jù)驗(yàn)證一下效果,輸入3行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),2,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),3,2026-10-0110:11:30,2026-10-0110:11:31,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),4,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-01Event[root@hadoop100soft]#nc-l900010:11:44],watermark:[1790820694000|2026-10-0110:11:34]Event[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),5,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),6,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),7,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-011Watermark10:11:35。[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]此時(shí),Watermark10:11:35我們?cè)佥斎霂讞lEventTime<Watermark3[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]□當(dāng)Watemark等于10:11:33window_end_time所以會(huì)觸發(fā)Window當(dāng)窗口執(zhí)行過(guò)后,我們?cè)佥斎隱10:11:30~10:11:33)WindowWindow是Watemark10:11:34的時(shí)候,我們輸入[10:11:30~10:11:33)Window內(nèi)的數(shù)據(jù)會(huì)發(fā)現(xiàn)Window也是可以被觸發(fā)的。Watemark10:11:35的時(shí)候,我們輸入[10:11:30~10:11:33)Window內(nèi)的數(shù)據(jù)會(huì)發(fā)現(xiàn)Window不會(huì)被觸發(fā)了。由于我們面設(shè)置了allowedLateness(Time.seconds(2)),因此可以允許延遲在2s內(nèi)的數(shù)據(jù)繼續(xù)觸發(fā)Window執(zhí)行。所以當(dāng)Watermark是10:11:34的時(shí)候可以觸發(fā)Window10:11:35的時(shí)候就?!鯐r(shí)第二次(或多次)Watermark<window_end_time+allowedLateness時(shí)間內(nèi),這個(gè)窗口有Late數(shù)據(jù)到達(dá)時(shí)。Watermark10:11:34EventTime10:11:30、10:11:31、10:11:32window_end_time都是10:11:33,也就是10:11:34<10:11:33+2true。但是當(dāng)Watermark等于10:11:35的時(shí)候,我們?cè)佥斎隕ventTime為10:11:3010:11:3110:11:32window_end_time10:11:33,此時(shí),10:11:35<10:11:33+2為false了,所以最終這些數(shù)據(jù)的時(shí)間太久了,就不會(huì)再觸發(fā)Window的執(zhí)行操作了。3:sideOutputLateData收集的數(shù)[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01此時(shí),WindowWatermark10:11:33[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]此時(shí),針對(duì)這幾條的數(shù)據(jù),都通過(guò)sideOutputLateData保存到了outputTag中[root@hadoop100soft]#nc-l900010:11:22],watermark:/p>

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論