《數(shù)據(jù)實時處理flink》課件-第五章 時間和窗口_第1頁
《數(shù)據(jù)實時處理flink》課件-第五章 時間和窗口_第2頁
《數(shù)據(jù)實時處理flink》課件-第五章 時間和窗口_第3頁
《數(shù)據(jù)實時處理flink》課件-第五章 時間和窗口_第4頁
《數(shù)據(jù)實時處理flink》課件-第五章 時間和窗口_第5頁
已閱讀5頁,還剩46頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

時間和窗口本章將重點介紹Flink的時間和窗口相關知識。通過本章的學習我們將了解不同時間語義下對數(shù)據(jù)流的相關處理。Window是Flink流計算的核心,我們將帶著大家學習窗口的生命周期,不同類型窗口的相關操作等。通過本節(jié)學習您將可以:熟悉Flink的時間語義。熟悉Flink最底層的API:ProcessFunction。掌握窗口以及窗口上的計算。掌握數(shù)據(jù)流上的Join操作。了解如何處理遲到數(shù)據(jù)。Flink的時間語義ProcessFunction系列函數(shù)窗口算子的使用雙流關聯(lián)處理遲到數(shù)據(jù)

三種時間語義事件到達Flink的時間可能是亂序的Event

Time:事件發(fā)生的時間無需擔心亂序到達問題Watermark假設不會有更晚的數(shù)據(jù)需要用緩存存儲中間數(shù)據(jù),增大了延遲Processing

Time:當前節(jié)點的系統(tǒng)時鐘時間計算結果有不確定性不需要設置Watermark延遲較低Ingestion

Time事件到達FlinkSource的時間不需要設置Watermark延遲較低在執(zhí)行環(huán)境層面設置使用哪種時間語義TimeCharacteristic.EventTimeTimeCharacteristic.ProcessingTimeTimeCharacteristic.IngestionTimefromElements()

或fromCollection()

創(chuàng)建的DataStream應該使用Event

Time,并對數(shù)據(jù)流中的每個元素的Event

Time賦值使用帶時序性的Source:Socket、Kafka等或者不帶時序性的Source,使用Event

Time,并生成Watermark:文件等設置時間語義env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);EventTime時間語義使用時間戳為數(shù)據(jù)流中的每個事件的Event

Time賦值生成WatermarkUnix時間戳系統(tǒng)、毫秒精度Watermark是插入到數(shù)據(jù)流中的一種特殊數(shù)據(jù)結構,它包含一個時間戳,并假設后續(xù)不會有小于該時間戳的數(shù)據(jù)Watermark時間戳必須單調(diào)遞增Watermark與事件時間越緊湊,越容易產(chǎn)生延遲數(shù)據(jù);越寬松,等待時間越長、延遲越高Event

Time和WatermarkWatermark需要在并行環(huán)境下向前傳播每個算子子任務維護一個針對該子任務的Event

Time時鐘,時鐘記錄了這個算子子任務的處理速度上游算子的Watermark數(shù)據(jù)不斷向下發(fā)送,算子子任務的Event

Time時鐘也要不斷向前更新每個算子子任務也要維護來自上游多個分區(qū)的Watermark信息:

PartitionWatermark

分布式環(huán)境下Watermark的傳播Flink先判斷新流入的Watermark時間戳是否大于PartitionWatermark列表內(nèi)該分區(qū)的歷史Watermark時間戳,大于則更新該Partition

Watermark時間戳遍歷Partition

Watermark列表,選擇最小的作為該算子子任務Event

Time時鐘,如果更新了Event

Time時鐘,則將更新的Event

Time作為Watermark發(fā)送給下游所有算子子任務分布式環(huán)境下Watermark的傳播抽取時間戳和生成Watermark兩者緊密結合只在Event

Time語義下有效時間越早設置越好設置方法:在Source中設置在Source之后設置抽取時間戳及生成Watermark老的Source接口:實現(xiàn)SourceFunction或RichSourceFunction抽取時間戳:collectWithTimestamp()生成Watermark:emitWatermark()SourceclassMySourceextendsRichSourceFunction[MyType]{@Overridepublicvoidrun(SourceContext<MyType>ctx)throwsException{while(/*condition*/){MyTypenext=getNext();ctx.collectWithTimestamp(next,next.eventTime);if(next.hasWatermarkTime()){

ctx.emitWatermark(newWatermark(next.watermarkTime));}}}}assignTimestampsAndWatermarks()方法:Flink

1.11之后進行了重構WatermarkStrategy抽取時間戳:withTimestampAssigner().withTimestampAssigner((event,timestamp)->event.eventTime)生成Watermark:forGenerator()實現(xiàn)自己的Watermark策略周期性地Periodic逐個式地PunctuatedSource之后DataStream<MyType>stream=...DataStream<MyType>withTimestampsAndWatermarks=stream.assignTimestampsAndWatermarks( WatermarkStrategy

.forGenerator(...)

.withTimestampAssigner(...));周期可以設置默認每200毫秒生成一次env.getConfig.setAutoWatermarkInterval(5000L)實現(xiàn)WatermarkGeneratoronEvent():數(shù)據(jù)流中每個元素到達后調(diào)用該方法onPeriodicEmit():定期發(fā)射WatermarkMyPeriodicGeneratorcurrentMaxTimestamp記錄已抽取的時間戳最大值時間戳最大值減1分鐘作為Watermark發(fā)送出去周期性地生成Watermark//定期生成Watermark

//數(shù)據(jù)流元素Tuple2<String,Long>共兩個字段

//第一個字段為數(shù)據(jù)本身

//第二個字段是時間戳

public

static

class

MyPeriodicGenerator

implements

WatermarkGenerator<Tuple2<String,Long>>{private

final

longmaxOutOfOrderness=60*1000;//1分鐘

private

longcurrentMaxTimestamp;//已抽取的Timestamp最大值

@Overridepublic

void

onEvent(Tuple2<String,Long>event,longeventTimestamp,WatermarkOutputoutput)

{//更新currentMaxTimestamp為當前遇到的最大值

currentMaxTimestamp=Math.max(currentMaxTimestamp,eventTimestamp);}@Overridepublic

void

onPeriodicEmit(WatermarkOutputoutput)

{//Watermark比currentMaxTimestamp最大值慢1分鐘

output.emitWatermark(newWatermark(currentMaxTimestamp-maxOutOfOrderness));}}用forGenerator()方法調(diào)用MyPeriodicGenerator類基于時間戳最大值的場景比較普遍,F(xiàn)link做了進一步封裝BoundedOutOfOrdernessWatermarksforBoundedOutOfOrderness()方法AscendingTimestampsWatermarksEvent

Time時間戳單調(diào)遞增forMonotonousTimestamps()方法周期性地生成Watermark//第二個字段是時間戳

DataStream<Tuple2<String,Long>>watermark=input.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context->newMyPeriodicGenerator())).withTimestampAssigner((event,recordTimestamp)->event.f1));//第二個字段是時間戳

DataStream<Tuple2<String,Long>>input=env.addSource(newMySource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event,timestamp)->event.f1));使用自己實現(xiàn)的MyPeriodicGenerator使用Flink封裝的forBoundedOutOfOrderness數(shù)據(jù)流元素有特殊標記,標記哪些元素為Watermark根據(jù)元素是否有特殊標記,判斷是否生成Watermark每個元素都生成一個Watermark,增大下游計算的延遲,拖累整個Flink作業(yè)的性能。逐個式地生成Watermark//逐個檢查數(shù)據(jù)流中的元素,根據(jù)元素中的特殊字段,判斷是否要生成Watermark

//數(shù)據(jù)流元素Tuple3<String,Long,Boolean>共三個字段

//第一個字段為數(shù)據(jù)本身

//第二個字段是時間戳

//第三個字段判斷是否為Watermark的標記

public

static

class

MyPunctuatedGenerator

implements

WatermarkGenerator<Tuple3<String,Long,Boolean>>{@Overridepublic

void

onEvent(Tuple3<String,Long,Boolean>event,longeventTimestamp,WatermarkOutputoutput)

{if(event.f2){output.emitWatermark(newWatermark(event.f1));}}@Overridepublic

void

onPeriodicEmit(WatermarkOutputoutput)

{//這里不需要做任何事情,因為我們在onEvent()方法中生成了Watermark

}}Watermark策略:在延遲和準確性之間平衡Watermark策略沒有標準答案我們無法預知流處理有多少遲到數(shù)據(jù)Watermark與事件時間戳貼合較緊,一些數(shù)據(jù)會被當成遲到數(shù)據(jù),影響計算結果的準確性Watarmark設置得較松,更多數(shù)據(jù)會先緩存起來以等待計算,整個應用的延遲增加;增大了內(nèi)存的壓力延遲與準確性Flink的時間語義ProcessFunction系列函數(shù)窗口算子的使用雙流關聯(lián)處理遲到數(shù)據(jù)Flink體系中最底層的API提供了對數(shù)據(jù)流更細粒度的操作權限訪問和更新狀態(tài)獲取時間戳、使用定時器(Timer)主要包括:KeyedProcessFunction、ProcessFunction、CoProcessFunction等ProcessFunction系列函數(shù)Timer就像一個鬧鐘先在Timer中注冊一個未來的時間當這個時間到達,“鬧鐘”響起,程序執(zhí)行回調(diào)函數(shù),回調(diào)函數(shù)執(zhí)行一定的業(yè)務邏輯ProcessFunction兩大重要接口:processElement()方法處理數(shù)據(jù)流中的一條元素,并通過Collector<O>輸出出來。Context是processElement()方法的特色,可以獲取時間戳、訪問TimerService,設置TimeronTimer()是回調(diào)函數(shù),當?shù)搅恕棒[鐘”時間,F(xiàn)link會調(diào)用onTimer()方法,執(zhí)行一些業(yè)務邏輯Timer的使用方法//處理數(shù)據(jù)流中的一條元素

public

abstract

void

processElement(Ivalue,Contextctx,Collector<O>out)//時間到達后的回調(diào)函數(shù)

public

void

onTimer(longtimestamp,OnTimerContextctx,Collector<O>out)

Timer的使用方法在processElement()方法中通過Context注冊一個未來的時間戳t在onTimer()方法中實現(xiàn)一些邏輯,到達t時刻,onTimer()方法被自動調(diào)用。只能在KeyedStream上注冊Timer每個Key下可以注冊多個不同時間戳作為Timer每個Key下某個時間戳下只能注冊一個Timer未來的時間戳t可以是Processing

Time也可以是Event

Time從Context中,我們可以獲取一個TimerService,這是一個訪問時間戳和Timer的接口Context.timerService.registerProcessingTimeTimer()注冊一個Processing

Time的TimerContext.timerService.deleteProcessingTimeTimer()刪除之前注冊的TimerContext.timerService.currentProcessingTime()獲取當前時間戳某支股票未來某段interval時間間隔是否一致連續(xù)上漲如果未來interval間隔內(nèi)一直上漲,發(fā)送一個提示解決思路:如果新數(shù)據(jù)比上次數(shù)據(jù)價格更高且沒有注冊Timer,注冊一個未來interval之后的Timer在interval期間內(nèi),如果價格回落,則把剛才的Timer刪掉在interval期間內(nèi),如果價格一直上升,觸發(fā)onTimer()onTimer()發(fā)送提示Timer案例:股票交易場景將一部分數(shù)據(jù)發(fā)送到另一個流中兩個流數(shù)據(jù)類型可以不一樣通過OutputTag<T>標記另外一個數(shù)據(jù)流將交易量大于100的數(shù)據(jù)流側輸出側輸出OutputTag<StockPrice>highVolumeOutput=

newOutputTag<StockPrice>("high-volume-trade"){};

public

static

class

SideOutputFunction

extends

KeyedProcessFunction<String,StockPrice,String>{@Overridepublic

void

processElement(StockPricestock,Contextcontext,Collector<String>out)

throwsException{if(stock.volume>100){

context.output(highVolumeOutput,stock);}else{

out.collect("normaltickdata");}}}DataStream<StockPrice>inputStream=...SingleOutputStreamOperator<String>mainStream=inputStream.keyBy(stock->stock.symbol)//調(diào)用process函數(shù),包含側輸出邏輯

.process(newSideOutputFunction());DataStream<StockPrice>sideOutputStream=mainStream.getSideOutput(highVolumeOutput);CoProcessFunction或KeyedCoProcessFunctionprocessElement1()方法:對第一個數(shù)據(jù)流的每個元素處理processElement2()方法:對第二個數(shù)據(jù)流的每個元素處理第一個流、第二個流可以共享狀態(tài)第一個流、第二個流、輸出流三者的數(shù)據(jù)類型可以不一樣案例:實現(xiàn)兩個數(shù)據(jù)流上的Join:創(chuàng)建狀態(tài),兩個流都可以訪問狀態(tài),例如狀態(tài)變量aprocessElement1()方法處理第一個數(shù)據(jù)流,更新狀態(tài)a。processElement2()方法處理第二個數(shù)據(jù)流,根據(jù)狀態(tài)a中的數(shù)據(jù),生成相應的輸出。兩個流上使用ProcessFunction股票流包含價格、交易量、時間戳等媒體評價流包含了對各支股票的正負評價兩支數(shù)據(jù)流一起流入KeyedCoProcessFunction主邏輯中先將兩個數(shù)據(jù)流connect(),然后按照股票代號進行keyBy(),進而使用process():案例:股票價格流與媒體評價流做Join/**

*四個泛型:Key,第一個流類型,第二個流類型,輸出。*/

publicstaticclass

JoinStockMediaProcessFunction

extends

KeyedCoProcessFunction<String,StockPrice,Media,StockPrice>{//mediaState

privateValueState<String>mediaState;@Overridepublicvoidopen(Configurationparameters)throwsException{//從RuntimeContext中獲取狀態(tài)

mediaState=getRuntimeContext().getState(newValueStateDescriptor<String>("mediaStatusState",Types.STRING));}@OverridepublicvoidprocessElement1(StockPricestock,Contextcontext,Collector<StockPrice>collector)throwsException{StringmediaStatus=mediaState.value();if(null!=mediaStatus){stock.mediaStatus=mediaStatus;collector.collect(stock);}}@OverridepublicvoidprocessElement2(Mediamedia,Contextcontext,Collector<StockPrice>collector)throwsException{//第二個流更新mediaState

mediaState.update(media.status);}}//讀入股票數(shù)據(jù)流

DataStream<StockPrice>stockStream=...//讀入媒體評價數(shù)據(jù)流

DataStream<Media>mediaStream=...DataStream<StockPrice>joinStream=stockStream.connect(mediaStream).keyBy("symbol","symbol")//調(diào)用process函數(shù)

.process(newJoinStockMediaProcessFunction());Flink的時間語義ProcessFunction系列函數(shù)窗口算子的使用雙流關聯(lián)處理遲到數(shù)據(jù)根據(jù)是否keyBy()分為Keyed

Window和Non-Keyed

WindowkeyBy()windowAll()下游算子并行度為1窗口程序兩個必須操作使用窗口分配器(WindowAssigner)將數(shù)據(jù)流中的元素分配到對應的窗口當滿足窗口觸發(fā)條件后,對窗口內(nèi)的數(shù)據(jù)使用窗口處理函數(shù)(WindowFunction)進行處理,常用的WindowFunction有reduce()、aggregate()、process()其他的trigger()、evictor()則是窗口的觸發(fā)和銷毀過程中的附加選項,主要面向需要更多自定義的高級編程者,如果不設置則會使用默認的配置。窗口程序的骨架結構//KeyedWindow

stream.keyBy(<KeySelector>)

//按照一個Key進行分組

.window(<WindowAssigner>)

//將數(shù)據(jù)流中的元素分配到相應的窗口中

[.trigger(<Trigger>)]//指定觸發(fā)器Trigger(可選)

[.evictor(<Evictor>)]//指定清除器Evictor(可選)

.reduce/aggregate/process()//窗口處理函數(shù)WindowFunction

//Non-KeyedWindow

stream.windowAll(WindowAssigner)//不分組,將數(shù)據(jù)流中的所有元素分配到相應的窗口中

[.trigger(<Trigger>)]//指定觸發(fā)器Trigger(可選)

[.evictor(<Evictor>)]//指定清除器Evictor(可選)

.reduce/aggregate/process()//窗口處理函數(shù)WindowFunction窗口分配器WindowAssigner將元素分配給不同的時間窗口時間窗口上進行Window

Function計算案例:設置10分鐘的時間窗口確定窗口的長度0:00

0:10、0:10

0:20數(shù)據(jù)流元素流入,根據(jù)元素的時間,分配到不同的窗口當窗口滿足了觸發(fā)條件,觸發(fā)相應的Window

Function計算窗口的生命周期窗口之間不重疊,窗口長度(Size)是固定的可以設置偏移量Offset內(nèi)置的窗口劃分方法–滾動窗口DataStream<T>input=...//基于Event

Time的時間窗口

input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowfunction>(...)//在小時級滾動窗口上設置15分鐘的Offset偏移

input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))).<windowfunction>(...)以一個步長(Slide)不斷向前滑動,窗口的長度(Size)固定Slide小于窗口的Size時,相鄰窗口會重疊,一個元素會被分配到多個窗口Slide大于Size,有些元素可能被丟掉

內(nèi)置的窗口劃分方法–滑動窗口DataStream<T>input=...//基于EventTime的滑動窗口

input.keyBy(<KeySelector>).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).<windowfunction>(...)兩個窗口之間有一個間隙,被稱為SessionGap當一個窗口在大于SessionGap的時間內(nèi)沒有接收到新數(shù)據(jù)時,窗口將關閉窗口的長度可變、窗口的開始和結束時間不確定可以設置定長的SessionGap,也可以使用SessionWindowTimeGapExtractor動態(tài)地確定SessionGap的長度內(nèi)置的窗口劃分方法–會話窗口DataStream<T>input=...//基于EventTime定長SessionGap的會話窗口

input.keyBy(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowfunction>(...)//基于EventTime變長SessionGap的會話窗口

input.keyBy(<KeySelector>).window(EventTimeSessionWindows .withDynamicGap((element)->{//返回SessionGap的長度

})).<windowfunction>(...)數(shù)據(jù)流元素經(jīng)過WindowAssigner后,被分配給不同的窗口使用窗口函數(shù),在每個窗口上對窗口內(nèi)的元素進行處理窗口函數(shù)分為兩類:增量計算,如reduce()和aggregate()全量計算,如process()增量計算:窗口保存一份中間數(shù)據(jù),每流入一個新元素,新元素與中間數(shù)據(jù)兩兩合一,生成新的中間數(shù)據(jù),再保存到窗口中。全量計算:窗口先緩存所有元素,等到觸發(fā)條件后對窗口內(nèi)的全量元素執(zhí)行計算。窗口函數(shù)ReduceFunction使用reduce()需要實現(xiàn)ReduceFunctionReduceFunction接受兩個相同類型的輸入,生成一個輸出。兩兩合一地進行匯總操作,生成一個同類型的新元素需要維護一個狀態(tài)數(shù)據(jù),狀態(tài)數(shù)據(jù)的數(shù)據(jù)類型和輸入、輸出的數(shù)據(jù)類型是一致的優(yōu)點:狀態(tài)數(shù)據(jù)小,ReduceFunction好實現(xiàn)缺點:功能受限//讀入股票數(shù)據(jù)流

DataStream<StockPrice>stockStream=...senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//reduce的返回類型必須和輸入類型StockPrice一致

DataStream<StockPrice>sum=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).reduce((s1,s2)->StockPrice.of(s1.symbol,s2.price,s2.ts,s1.volume+s2.volume));AggregateFunction使用aggregate()需要實現(xiàn)AggregateFunction實現(xiàn)起來稍復雜:輸入類型IN、輸出類型OUT、中間狀態(tài)數(shù)據(jù)ACC三者不相同,可以自定義ACC的數(shù)據(jù)結構需要實現(xiàn)多個虛方法:createAccumulator()、add()、getResult()public

interface

AggregateFunction<IN,ACC,OUT>extends

Function,Serializable{//在一次新的aggregate發(fā)起時,創(chuàng)建一個新的Accumulator,Accumulator是我們所說的中間狀態(tài)數(shù)據(jù),簡稱ACC

//這個函數(shù)一般在初始化時調(diào)用

ACCcreateAccumulator();//當一個新元素流入時,將新元素與狀態(tài)數(shù)據(jù)ACC合并,返回狀態(tài)數(shù)據(jù)ACC

ACCadd(INvalue,ACCaccumulator);//將兩個ACC合并

ACCmerge(ACCa,ACCb);//將中間數(shù)據(jù)轉成結果數(shù)據(jù)

OUTgetResult(ACCaccumulator);}AggregateFunction源碼AggregateFunction計算一個窗口內(nèi)某支股票的平均值ACC中要保存總和(sum)、個數(shù)(count)以及股票代號(symbol)createAccumulator():創(chuàng)建新的ACC,初始化ACC數(shù)據(jù)add():新數(shù)據(jù)到達,更新ACC中的sum和countgetResult():將ACC轉換為最終結果merge():窗口融合時,多個窗口里的ACC合并,生成新的ACCAggregateFunction計算流程

AggregateFunction/**

*接收三個泛型:*IN:StockPrice

*ACC:(String,Double,Int)-(symbol,sum,count)

*OUT:(String,Double)-(symbol,average)

*/

publicstaticclass

AverageAggregate

implements

AggregateFunction<StockPrice,Tuple3<String,Double,Integer>,Tuple2<String,Double>>{@OverridepublicTuple3<String,Double,Integer>createAccumulator(){returnTuple3.of("",0d,0);}@OverridepublicTuple3<String,Double,Integer>add(StockPriceitem,Tuple3<String,Double,Integer>accumulator){doubleprice=accumulator.f1+item.price;intcount=accumulator.f2+1;returnTuple3.of(item.symbol,price,count);}@OverridepublicTuple2<String,Double>getResult(Tuple3<String,Double,Integer>accumulator){returnTuple2.of(accumulator.f0,accumulator.f1/accumulator.f2);}@OverridepublicTuple3<String,Double,Integer>merge(Tuple3<String,Double,Integer>a,Tuple3<String,Double,Integer>b){returnTuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}DataStream<StockPrice>stockStream=...DataStream<Tuple2<String,Double>>average=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).aggregate(newAverageAggregate());主程序實現(xiàn)一個AggregateFunctionProcessWindowFunctionProcessWindowFunction要對窗口內(nèi)的全量數(shù)據(jù)都緩存Flink將某個Key下某個窗口的所有元素都緩存在Iterable<IN>中,對其進行處理后,用Collector<OUT>收集輸出Context獲取窗口內(nèi)更多的信息,包括時間、狀態(tài)、遲到數(shù)據(jù)發(fā)送位置等/**

*函數(shù)接收四個泛型*IN輸入類型*OUT輸出類型*KEYkeyBy中按照Key分組,Key的類型*W窗口的類型*/

public

abstract

class

ProcessWindowFunction<IN,OUT,KEY,W

extends

Window>extends

AbstractRichFunction{/**

*對一個窗口內(nèi)的元素進行處理,窗口內(nèi)的元素緩存在Iterable<IN>,進行處理后輸出到Collector<OUT>中*我們可以輸出一到多個結果*/

public

abstract

void

process(KEYkey,Contextcontext,Iterable<IN>elements,Collector<OUT>out)

throwsException;/**

*當窗口執(zhí)行完畢被清理時,刪除各類狀態(tài)數(shù)據(jù)。*/

public

void

clear(Contextcontext)

throwsException{}…}ProcessWindowFunction源碼ProcessWindowFunction與增量計算相結合想訪問窗口中Context等元數(shù)據(jù),又想使用增量計算,不緩存所有數(shù)據(jù),可以將ProcessWindowFunction與增量計算函數(shù)reduce()或aggregate()相結合Flink先進行增量計算,窗口結束前,將增量計算結果發(fā)送給ProcessWindowFunction再處理案例:計算時間窗口下股票的最大值、最小值以及窗口結束時間戳窗口的最大值、最小值可以由reduce()增量計算得到窗口結束時間戳需要從Context中獲得觸發(fā)器Trigger每個窗口都有一個Trigger,Trigger決定了窗口何時啟動Window

Function執(zhí)行計算、何時清理窗口中的數(shù)據(jù)例如:Processing

Time下的時間窗口帶有一個默認的Trigger,當?shù)竭_這個窗口的結束時間,觸發(fā)相應的計算其他窗口觸發(fā)的特例:窗口中遇到某些特定的元素、元素總數(shù)達到一定數(shù)量或窗口中元素按照某個特定模式順序到達針對這些特例,可以自定義Trigger觸發(fā)器TriggerTrigger返回結果:CONTINUE:什么都不做。FIRE:啟動計算并將結果發(fā)送給下游,不清理窗口數(shù)據(jù)。PURGE:清理窗口數(shù)據(jù)但不執(zhí)行計算。FIRE_AND_PURGE:啟動計算,發(fā)送結果然后清理窗口數(shù)據(jù)。Trigger本質(zhì)上是一種定時器Timer,注冊一個合適的時間,到達這個時間,根據(jù)業(yè)務邏輯決定發(fā)送上面四個結果中的一個。清除器EvictorEvictor用來清除數(shù)據(jù)增量計算沒必要使用EvictorevictBefore()和evictAfter()分別在WindowFunction之前和之后被調(diào)用,方法里可以自定義一些業(yè)務邏輯,清除窗口中的數(shù)據(jù)Flink的時間語義ProcessFunction系列函數(shù)窗口算子的使用雙流關聯(lián)處理遲到數(shù)據(jù)雙流關聯(lián)將兩個數(shù)據(jù)流的數(shù)據(jù)關聯(lián)(Join)流處理的Join是在時間窗口上進行兩個流的Join目前,F(xiàn)link支持兩種:窗口連接Window

Join時間間隔連接Interval

Join窗口連接Window

Join同一個窗口上兩個流按照某個Key進行Join窗口劃分可以使用滾動窗口、滑動窗口和會話窗口一個窗口包含來自兩個數(shù)據(jù)流中的元素,兩個流之間以Inner

Join語義關聯(lián),形成數(shù)據(jù)對窗口結束時間,F(xiàn)link使用JoinFunction來對窗口中的數(shù)據(jù)進行處理input1.join(input2)

.where(<KeySelector>) <-input1使用哪個字段作為Key.equalTo(<KeySelector>) <-input2使用哪個字段作為Key.window(<WindowAssigner>) <-指定WindowAssigner[.trigger(<Trigger>)] <-指定Trigger(可選)[.evictor(<Evictor>)] <-指定Evictor(可選).apply(<JoinFunction>) <-指定JoinFunction

窗口連接Window

Joinpublic

static

class

MyJoinFunction

implements

JoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>{@OverridepublicStringjoin(Tup

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論