Apache Flink-實時計算正當時_第1頁
Apache Flink-實時計算正當時_第2頁
Apache Flink-實時計算正當時_第3頁
Apache Flink-實時計算正當時_第4頁
Apache Flink-實時計算正當時_第5頁
已閱讀5頁,還剩178頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第2期

Phase2ApacheFlink

2

。

12實時計算正當時

版ApacheFUnk技術與實戰(zhàn)精解

I目錄

■技術篇

ApacheFlink1.13.0正式發(fā)布,流處理應用更加簡單高效!1

深入解讀FlinkSQL1.1319

ApacheFlink1.13,面向流批一體的運行時與DatastreamAPI優(yōu)化41

ApacheFlink1.13,StateBackend優(yōu)化及生產實踐分享59

Flink+Iceberg全場景實時數倉的建設實踐72

■實踐篇

知乎的Flink數據集成平臺建設實踐94

騰訊游戲實時計算應用平臺建設實踐107

FUnkSQLCDC實踐以及一致性分析128

ApacheFlink在bilibili的多元化探索與實踐149

ApacheFlink1.13.0正式發(fā)布,流處理應用更加簡單高效!

ApacheFlink1.13.0正式發(fā)布,

流處理應用更加簡單高效!

來源IFlink中文社區(qū)

翻譯|高贊

Review|朱翥、馬國維

ApacheFlink1.13發(fā)布了!Flink1.13包括了超過200名貢獻者所提交的1000多項修

復和優(yōu)化。

這一版本中,Flink的一個主要目標取得了重要進展,即讓流處理應用的使用像普通

應用一樣簡單和自然。Flink1.13新引入的被動擴縮容使得流作業(yè)的擴縮容和其它應

用一樣簡單,用戶僅需要修改并發(fā)度即可。

這個版本還包括一系列重要改動使用戶可以更好理解流作業(yè)的性能。當流作業(yè)的性

能不及預期的時候,這些改動可以使用戶可以更好的分析原因。這些改動包括用于

識別瓶頸節(jié)點的負載和反壓可視化、分析算子熱點代碼的CPU火焰圖和分析State

Backend狀態(tài)的State訪問性能指標。

除了這些特性外,Flink社區(qū)還添加了大量的其它優(yōu)化,本文后續(xù)會討論其中的一些。

我們希望用戶可以享受新的版本和特性帶來的便利,在本文最后,我們還會介紹升級

Flink版本需要注意的一些變化。

我們鼓勵用戶下載試用新版Flink[1]并且通過郵件列表[2]和JIRA[3]來反饋遇到的問

題。

一、重要特性

被動擴縮容

Flink項目的一個初始目標,就是希望流處理應用可以像普通應用一樣簡單和自然,

被動擴縮容是Flink針對這一目標上的最新進展。

當考慮資源管理和部分的時候,Flink有兩種可能的模式。用戶可以將Flink應用部署

到k8s、yarn等資源管理系統(tǒng)之上,并且由Flink主動的來管理資源并按需分配和釋

放資源。這一模式對于經常改變資源需求的作業(yè)和應用非常有用,比如批作業(yè)和實時

SQL查詢。在這種模式下,Flink所啟動的Worker數量是由應用設置的并發(fā)度決定的。

在FLink中我們將這一模式叫做主動擴縮容。

對于長時間運行的流處理應用,一種更適合的模型是用戶只需要將作業(yè)像其它的長

期運行的服務一樣啟動起來,而不需要考慮是部署在k8s、yarn還是其它的資源管

理平臺上,并且不需要考慮需要申請的資源的數量。相反,它的規(guī)模是由所分配的

worker數量來決定的。當worker數量發(fā)生變化時,Rink自動的改動應用的并發(fā)度。

在Flink中我們將這一模式叫做被動擴縮容。

Flink的Application部署模式[4]開啟了使Flink作業(yè)更接近普通應用(即啟動Flink

作業(yè)不需要執(zhí)行兩個獨立的步驟來啟動集群和提交應用)的努力,而被動擴縮容完成

了這一目標:用戶不再需要使用額外的工具(如腳本、K8s算子)來讓Worker的數

量與應用并發(fā)度設置保持一致。

用戶現在可以將自動擴縮容的工具應用到Flink應用之上,就像普通的應用程序一樣,

只要用戶了解擴縮容的代價:有狀態(tài)的流應用在擴縮容的時候需要將狀態(tài)重新分發(fā)。

如果想要嘗試被動擴縮容,用戶可以增力口scheduler-mode:reactive這一酉d置項,然

后啟動一個應用集群(Standalone[5]或者K8s⑹)。更多細節(jié)見被動擴縮容的文檔[7]o

分析應用的性能

對所有應用程序來說,能夠簡單的分析和理解應用的性能是非常關鍵的功能。這一

功能對Flink更加重要,因為Flink應用一般是數據密集的(即需要處理大量的數據)

并且需要在(近)實時的延遲內給出結果。

3

當Flink應用處理的速度跟不上數據輸入的速度時,或者當一個應用占用的資源超過

預期,下文介紹的這些工具可以幫你分析原因。

■瓶頸檢測與反壓監(jiān)控

Flink性能分析首先要解決的問題經常是:哪個算子是瓶頸?

為了回答這一問題,Flink引入了描述作業(yè)繁忙(即在處理數據)與反壓(由于下游

算子不能及時處理結果而無法繼續(xù)輸出)程度的指標。應用中可能的瓶頸是那些繁忙

并且上游被反壓的算子。

Flink1.13優(yōu)化了反壓檢測的邏輯(使用基于任務Mailbox計時,而不在再于堆棧采

樣),并且重新實現了作業(yè)圖的UI展示:Flink現在在UI上通過顏色和數值來展示繁

忙和反壓的程度。

■WebUI中的CPU火焰圖

Flink關于性能另一個經常需要回答的問題:瓶頸算子中的哪部分計算邏輯消耗巨大?

針對這一問題,一個有效的可視化工具是火焰圖。它可以幫助回答以下問題:

?哪個方法調用現在在占用CPU?

?不同方法占用CPU的比例如何?

?一個方法被調用的棧是什么樣子的?

火焰圖是通過重復采樣線程的堆棧來構建的。在火焰圖中,每個方法調用被表示為一

個矩形,矩形的長度與這個方法出現在采樣中的次數成正比?;鹧鎴D在UI上的一個

例子如下圖所示。

termarfcsAccumulatorsBackpressureMet/icsHameGraph

On-CPUMixedMeasurement:14sago

DynamicRuleEvaluationFuncti

on->(AlertsDesenalizanon->■■■

Sink:AlertsJSONSink.RulesDljdd:l£6

eseriaNzation->Sink:Unname

6

Parallelism:1

Backpressureci(max)0%

Busy(max):45%

LowWatermark:16145301506$

火焰圖的文檔[8]包括啟用這一功能的更多細節(jié)和指令。

■State訪問延遲指標

另一個可能的性能瓶頸是statebackend,尤其是當作業(yè)的state超過內存容量而必

須使用RocksDBstatebackend[9]時。

這里并不是想說RocksDB性能不夠好(我們非常喜歡RocksDB!),但是它需要滿

足一些條件才能達到最好的性能。例如,用戶可能很容易遇到非故意的在云上由于

使用了錯誤的磁盤資源類型而不能滿足RockDB的10性能需求[10]的問題。

基于CPU火焰圖,新的StateBackend的延遲指標可以幫助用戶更好的判斷性能不

符合預期是否是由StateBackend導致的。例如,如果用戶發(fā)現RocksDB的單次訪

問需要幾毫秒的時間,那么就需要查看內存和I/O的配置。這些指標可以通過設置

state.backend.rocksdb.latency-track-enabled這一選項來啟用。這些指標是通過采

樣的方式來監(jiān)控性能的,所以它們對RocksDBStateBackend的性能影響是微不足道

的。

5

通過Savepoint來切換StateBackend

用戶現在可以在從一個Savepoint重啟時切換一個Flink應用的StateBackend。這

使得Flink應用不再被限制只能使用應用首次運行時選擇的StateBackend。

基于這一功能,用戶現在可以首先使用一個HashMapStateBackend(純內存的

StateBackend),如果后續(xù)狀態(tài)變得過大的話,就切換到RocksDBStateBackend中。

在實現層,Flink現在統(tǒng)一了所有StateBackend的Savepoint格式來實現這一功能。

K8s部署時使用用戶指定的Pod模式

原生kubernetes部署口1](Flink主動要求K8s來啟動Pod)中,現在可以使用自定

義的Pod模板。

使用這些模板,用戶可以使用一種更符合K8s的方式來設置JM和TM的Pod,這種

方式比FlinkK8s集成內置的配置項更加靈活。

生產可用的UnalignedCheckpoint

UnalignedCheckpoint目前已達到了生產可用的狀態(tài),我們鼓勵用戶在存在反壓的

情況下試用這一功能。

具體來說,ApacheFlink1.13中引入的這些功能使UnalignedCheckpoint更容易使

用:

?用戶現在使用UnalignedCheckpoint時也可以擴縮容應用。如果用戶需要因為性

能原因不能使用Savepoint而必須使用Retainedcheckpoint時,這一功能會非常

方便。

?對于沒有反壓的應用,啟用UnalignedCheckpoint現在代價更小。Unaligned

Checkpoint現在可以通過超時來自動觸發(fā),即一個應用默認會使用Aligned

Checkpoint(不存儲傳輸中的數據),而只在對齊超過一定時間范圍時自動切換

至UUnalignedCheckpoint(存儲傳輸中的數據)。

關于如何啟用UnalignedCheckpoint可以參考相關文檔口2]。

機器學習遷移到單獨的倉庫

為了加速Flink機器學習的進展(流批統(tǒng)一的機器學習),現在Flink機器學習開啟了

新的倉庫。我們采用類似于StatefulFunction項目的管理方式,通過使

用一個單獨的倉庫從而簡化代碼合并的流程并且可以進行單獨的版本發(fā)布,從而提高

開發(fā)的效率。

用戶可以關注FMk在機器學習方面的進展,比如與Alink口4](Flink常用機器學習算

法套件)的互操作以及與的集成

FlinkTensorflow[15]o

二、SQL/TableAPI進展

與之前的版本類似,SQL和TableAPI仍然在所有開發(fā)中占用很大的比例。

通過Table-valued函數來定義時間窗口

在流式SQL查詢中,一個最經常使用的是定義時間窗口。ApacheFlink1.13中引入

了一種新的定義窗口的方式:通過Table-valued函數。這一方式不僅有更強的表達

能力(允許用戶定義新的窗口類型),并且與SQL標準更加一致。

ApacheFlink1.13在新的語法中支持TUMBLE和HOP窗口,在后續(xù)版本中也會支持

SESSION窗口。我們通過以下兩個例子來展示這一方法的表達能力:

?例1:一個新引入的CUMULATE窗口函數,它可以支持按特定步長擴展的窗口,

直到達到最大窗口大?。?/p>

SELECTwindow_time,window_start,window_end,SUM(price)AStotal_price

FROMTABLE(CUMULATE(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'2'MINUTES,INTERVAL'10'

MINUTES))

GROUPBYwindow_start?window_end?window_time;

?例2:用戶在table-valued窗口函數中可以訪問窗口的起始和終止時間,從而使

用戶可以實現新的功能。例如,除了常規(guī)的基于窗口的聚合和Join之外,用戶現

在也可以實現基于窗口的Top-K聚合:

SELECTwindow』me,…

FROM(

SELECT"ROW_NUMBER()OVER(PARTITIONBYwindow_start,window_endORDERBYtotal_price

DESC)

asrank

FROMt

)WHERErank<=100;

提高DatastreamAPI與TableAPI/SQL的互操作能力

這一版本極大的簡化了DatastreamAPI與TableAPI混合的程序。

TableAPI是一種非常方便的應用開發(fā)接口,因為這僅支持表達式的程序編寫并提供

了大量的內置函數。但是有時候用戶也需要切換回Datastream,例如當用戶存在表

達能力、靈活性或者State訪問的需求時。

Flink新引入的StreamTableEnvironment.toDataStreaiTi()/.fromDataStream()可以

將一個DatastreamAPI聲明的Source或者Sink當作Table的Source或者Sink來使

用。主要的優(yōu)化包括:

?Datastream與TableAPI類型系統(tǒng)的自動轉換。

?EventTime配置的無縫集成,Watermark行為的高度一致性。

?Row類型(即TableAPI中數據的表示)有了極大的增強,包括toString()/

hashCodeQ和equals。方法的優(yōu)化,按名稱訪問字段值的支持與稀疏表示的支持。

Tabletable=tableEnv.fromDataStream(

datastream,

Schema.newBuilder()

n

.columnByMetadata(rowtime","TIMESTAMPtB)")

n

.watermark("rowtime","SOURCE_WATERMARK())

.buildQ);

DataStream<Row>datastream=tableEnv.toDataStream(table)

.keyBy(r->r.getField("user"))

.window(...);

SQLClient:初始化腳本和語句集合(StatementSets)

SQLClient是一種直接運行和部署SQL流或批作業(yè)的簡便方式,用戶不需要編寫代碼

就可以從命令行調用SQL,或者作為Cl/CD流程的一部分。

這個版本極大的提高了SQLClient的功能。現在基于所有通過Java編程(即通過編

程的方式調用TableEnvironment來發(fā)起查詢)可以支持的語法,現在SQLClient和

SQL腳本都可以支持。這意味著SQL用戶不再需要添加膠水代碼來部署他們的SQL

作業(yè)。

■配置簡化和代碼共享

Flink后續(xù)將不再支持通過Yami的方式來配置SQLClient(注:目前還在支持,但是

已經被標記為廢棄)。作為替代,SQLClient現在支持使用一個初始化腳本在主SQL

腳本執(zhí)行前來配置環(huán)境。

這些初始化腳本通常可以在不同團隊/部署之間共享。它可以用來加載常用的

catalog,應用通用的配置或者定義標準的視圖。

,/sql-client.sh-iinitl.sqlinit2.sql-fsqljob.sql

■更多的配置項

通過增加配置項,優(yōu)化SET/RESET命令,用戶可以更方便的在SQLClient和SQL腳

本內部來控制執(zhí)行的流程。

■通過語句集合來支持多查詢

多查詢允許用戶在一個Flink作業(yè)中執(zhí)行多個SQL查詢(或者語句)。這對于長期運

行的流式SQL查詢非常有用。

語句集可以用來將一組查詢合并為一組同時執(zhí)行。

以下是一個可以通過SQLClient來執(zhí)行的SQL腳本的例子。它初始化和配置了執(zhí)行

多查詢的環(huán)境。這一腳本包括了所有的查詢和所有的環(huán)境初始化和配置的工作,從而

使它可以作為一個自包含的部署組件。

-setupacatalog

CREATECAIALOGhive_catalogWITH('type'='hive');

USECATALOGhive_catalog;

—orusetemporaryobjects

CREATETEMPORARYTABLEclicks(

userjdBIGINT

pagejdBIGINT

viewtimeTIMESTAMP

10

)WITH(

'connector'='kafka'

'topic'='clicks*

'properties.bootstrap.servers'=:;

'format'='avro*

);

—settheexecutionmodeforjobs

SETexecution.runtime-mode=streaming;

-setthesync/asyncmodeforINSERTINTOs

SETtabLe.dmI-sync=faIse;

-setthejob'sparallelism

SETparallism.default=10;

—setthejobname

SET=my_flinkjob;

—restorestatefromthespecificsavepointpath

SETexecution.savepoint.path=/tmp/flink-savepoints/savepoint-bbOdab;

BEGINSTATEMENTSET;

INSERTINTOpageview_pv_sink

SELECTpagejd,count(l)FROMclicksGROUPBYpagejd;

INSERTINTOpageview_uv__sink

SELECTpagejd,count(distinctuserjd)FROMclicksGROUPBYpagejd;

END;

Hive查詢語法兼容性

用戶現在在Flink上也可以使用HiveSQL語法。除了HiveDDL方言之外,Flink現在

也支持常用的HiveDML和DQL方言。

為了使用HiveSQL方言,需要設置table.sql-dialect為hive并且加載HiveModule0

后者非常重要,因為必須要加載Hive的內置函數后才能正確實現對Hive語法和語義

的兼容性。例子如下:

CREATECATALOGmyhiveWITH('type'='hive');"setupHiveCatalog

USECATALOGmyhive;

LOADMODULEhive;-setupHiveModule

USEMODULEShive,core;

SETtable.sql-dialect;hive;-enableHivedialect

SELECTkeyvalueFROMsrcCLUSTERBYkey;—runsomeHivequeries

需要注意的是,Hive方言中不再支持Flink語法的DML和DQL語句。如果要使用

Flink語法,需要切換回default的方言配置。

優(yōu)化的SQL時間函數

在數據處理中時間處理是一個重要的任務。但是與此同時,處理不同的時區(qū)、日期

和時間是一個日益復雜[16]的任務。

在Flink1.13中,我們投入了大量的精力來簡化時間函數的使用。我們調整了時間

相關函數的返回類型使其更加精確,例如PROCTIMEQ,CURRENT_TIMESTAMP()和

N0W()o

其次,用戶現在還可以基于一個TIMESTAMP_LTZ類型的列來定義EventTime屬性,

從而可以優(yōu)雅的在窗口處理中支持夏令時。

用戶可以參考ReleaseNote來查看該部分的完整變更。

12

三、PyFlink核心優(yōu)化

這個版本對PyFlink的改進主要是基于Python的DatastreamAPI和TableAPI還有

Java/seaLa版本的對應功能更加一致。

PythonDatastreamAPI中的有狀態(tài)算子

在ApacheFlink1.13中,Python程序員可以享受到Flink狀態(tài)處理API的所有能力。

在ApacheFlink1.12版本重構過的PythonDatastreamAPI現在已經擁有完整的狀

態(tài)訪問能力,從而使用戶可以將數據的信息記錄到state中并且在后續(xù)訪問。

帶狀態(tài)的處理能力是許多依賴跨記錄狀態(tài)共享(例如WindowOperator)的復雜數

據處理場景的基礎。

以下例子展示了一個自定義的計算窗口的實現:

classCountWindowAverage(FlatMapFunction):

def_init_(self?window_size):

self.window__size=window_size

defopen(selfzruntime_context:Runtimecontext):

descriptor=ValueStateDescriptor("average''Types.TUPLE([Types.LONG(),Types.LONG()]))

self.sum=runtime_context.get_state(descriptor)

defflat_map(self,value):

current_sum=self.sum.valueQ

ifcurrent_sumisNone:

current_sum=(0z0)

#updatethecount

current_sum=(current_sum[0]+1,current_sum[1]+value[1])

#ifthecountreacheswindow_sizexemittheaverageandclearthestate

ifcurrent_sum[0]>=self.window_size:

13

self.sum.clearQ

yieldvalue[O]/current_sum[l]//current_sum[O]

else:

self.sum.update(current_sum)

ds=...#type:Datastream

ds.key_by(lambdarow:row[0])\

.flat_map(CountWindowAverage(5))

PyFlinkDatastreamAPI中的用戶自定義窗口

ApacheFlink1.13中PyFlinkDatastream接口增加了對用戶自定義窗口的支持,現在

用戶可以使用標準窗口之外的窗口定義。

由于窗口是處理無限數據流的核心機制(通過將流切分為多個有限的『桶』),這

一功能極大的提高的API的表達能力。

PyFlinkTableAPI中基于行的操作

PythonTableAPI現在支持基于行的操作,例如用戶對行數據的自定義函數。這一功

能使得用戶可以使用非內置的數據處理函數。

一個使用map()操作的PythonTableAPI示例如下:

@udf(result_type=DataTypes.ROW(

[DataTypes.FIELDf'cl:DataTypes.BIGINT()).

DataTypes.FIELD("c2",DataTypes.STRING())]))

defincrement_column(r:Row)->Row:

returnRow(r[0]+1zr[1])

table=...#type:Table

mapped_result=table.map(increment_column)

14

除了map(),這一API還支持flat_map(),aggregate。,flat_aggregate()和其它基

于行的操作。這使PythonTableAPI的功能與JavaTableAPI的功能更加接近。

PyFlinkDatastreamAPI支持Batch執(zhí)行模式

對于有限流,PyFlinkDatastreamAPI現在已經支持Flink1.12DatastreamAPI中引

入的Batch執(zhí)行模式。

通過復用數據有限性來跳過Statebackend和Checkpoint的處理,Batch執(zhí)行模式

可以簡化運維,并且提高有限流處理的性能。

四、其它優(yōu)化

基于Hugo的Flink文檔

Flink文檔從Jekyll遷移到了Hugo。如果您發(fā)現有問題,請務必通知我們,我們非常

期待用戶對新的界面的感受。

WebUI支持歷史異常

FlinkWebUI現在可以展示導致作業(yè)失敗的n次歷史異常,從而提升在一個異常導致

多個后續(xù)異常的場景下的調試體驗。用戶可以在異常歷史中找到根異常。

優(yōu)化失敗Checkpoint的異常和失敗原因的匯報

Flink現在提供了失敗或被取消的Checkpoint的統(tǒng)計,從而使用戶可以更簡單的判斷

Checkpoint失敗的原因,而不需要去查看日志。

Flink之前的版本只有在Checkpoint成功的時候才會匯報指標(例如持久化數據的大

小、觸發(fā)時間等)。

提供『恰好一次』一致性的JDBCSink

15

從ApacheFlink1.13開始,通過使用事務提交數據,JDBCSink可以對支持XA事務

的數據庫提供力恰好一次」的一致性支持。這一特性要求目標數據庫必須有(或鏈接

到)一個XA事務處理器。

PyFlinkTableAPI在Group窗口上支持用戶自定義的聚合函數

PyFlinkTableAPI現在對Group窗口同時支持基于Python的用戶自定義聚合函數

(User-definedAggregateFunctions,UDAFs)以及PandasUDAFs。這些函數對許

多數據分析或機器學習訓練的程序非常重要。

在Flink1.13之前,這些函數僅能在無限的Group-by聚合場景下使用。Flink1.13優(yōu)

化了這一限制。

Batch執(zhí)行模式下Sort-mergeShuffle優(yōu)化

ApacheFlink1.13優(yōu)化了針對批處理程序的Sort-mergeBlockingShuffle的性能和

內存占用情況。這一Shuffle模式是在Flink1.12的FLIP-148[17]中引入的。

這一優(yōu)化避免了大規(guī)模作業(yè)下不斷出現OutOfMemoryError:DirectMemory的問題,

并且通過I/。調度和broadcast優(yōu)化提高了性能(尤其是在機械硬盤上)。

HBase連接器支持異步維表查詢和查詢緩存

HBaseLookupTableSource現在可以支持異步查詢模式和查詢緩存。這極大的提高

了使用這一Source的Table/SQL維表Join的性能,并且在一些典型情況下可以減

少對HBase的I/O請求數量。

在之前的版本中,HBaseLookupSource僅支持同步通信,從而導致作業(yè)吞吐以及資

源利用率降低。

16

升級Flink1.13需要注意的改動:

?FLINK-21709口8]-老的Table&SQLAPI計劃器已經被標記為廢棄,并且將在Flink

1.14中被刪除。Blink計劃器在若干版本之前已經被設置為默認計劃器,并且將成

為未來版本中的唯一計劃器。這意味著BatchTableEnvironmentWDataSetAPI互

操作后續(xù)也將不再支持。用戶需要切換到統(tǒng)一的TableEnvironment來編寫流或者

批的作業(yè)。

?FUNK-22352口9]-Flink社區(qū)決定廢棄對Apachemesos的支持,未來有可能會進

一步刪除這部分功能。用戶最好能夠切換到其它的資源管理系統(tǒng)上。

?FLINK-21935[20]-state.backend.async這一配置已經被禁用了,因為現在Flink

總是會異步的來保存快照(即之前的配置默認值),并且現在沒有實現可以支持

同步的快照保存操作。

?FLINK-17012[21]-Task的RUNNING狀態(tài)被細分為兩步:INITIALIZING和

RUNNINGOTask的INITIALIZING階段包括加載state和在啟用unaligned

checkpoint時恢復In-flight數據的過程。通過顯式區(qū)分這兩種狀態(tài),監(jiān)控系統(tǒng)可

以更好的區(qū)分任務是否已經在實際工作。

?FUNK-21698[22]-NUMERIC和TIMESTAMP類型之間的直接轉換存在問題,

現在已經被禁用,例如CAST(numericASTIMESTAMP(3))。用戶應該使用T0_

TIMESTAMP(FROM_UNIXTIME(numeric))來代替。

?FUNK-22133[23]-新的Source接口有一個小的不兼容的修改,即

SplitEnumerator.snapshotStateQ方法現在多接受一個checkpointid參數來表示正

在進行的snapshot操作所屬的checkpoint的id。

?FLINK-19463[24]-由于老的Statebackend接口承載了過多的語義并且容易引起困

惑,這一接口被標記為廢棄。這是一個純API層的改動,而并不會影響應用運行時。

對于如何升級現有作業(yè),請參考作業(yè)遷移指引[25]。

五、其它資源

17

二進制和代碼可以從ApacheFlink官網的下載頁面[26]獲得,最新的PyFUnk發(fā)布可

以從PyPI[27]獲得。

如果想要升級到ApacheFlink1.13,請參考發(fā)布說明[28]。這一版本與之前1.x的版

本在標記為@Public的接口上是兼容的。

用戶也可以查看新版本修改列表[29]與更新后的文檔[30]來獲得修改和新功能的詳

細列表。

■原文鏈接:/news/2021/05/03/release-l.lB.O.html

參考鏈接:

[1]/downloads.html

[2]https://flink.apache.Org/community.html#mailing-lists

[3]/jira/projects/FLINK/summary

[4]/projects/flink/flink-docs-release-l.13/docs/concepts/flink-

architecture/#flink-application-execution

[5]/projects/flink/flink-docs-release-L13/docs/deployment/resource-

providers/standalone/overview/#application-mode

[6]/projects/flink/flink-docs-release-l.13/docs/deployment/resource-

providers/standalone/kubernetes/#deploy-application-cluster

[7]https://ci.apache.Org/projects/flink/flink-docs-release-l.13/docs/deployment/elastic_

scaling/#reactive-mode

[8]/projects/flink/flink-docs-release-l.13/docs/ops/debugging/flame_graphs

[9]https://ci.apache.0rg/pr0jects/flink/flink-d0cs-release-l.l3/docs/ops/state/state_

backends/#the-embeddedrocksdbstatebackend

[10]/blog/the-impact-of-disks-on-rocksdb-state-backend-in-flink-a-

case-study

[11]/projects/flink/flink-docs-release-1J3/docs/deployment/resource-

providers/native_kubernetes/

18

[12]https://ci.apache.org/projects/flink/flink-docs-release-l-l3/docs/ops/state/

checkpoints/#unaligned-checkpoints

[13]h注ps:〃/apache/flink-ml

[14]/alibaba/Alink

[15]/alibaba/flink-ai-extended

[16]/1883/

[17]/confluence/display/FLINK/FLIP-148%3A+lntroduce+Sort-Merge+Based

+Blocking+Shuffle+to+Flink

[18]/jira/browse/FLINK-21709

[19]h注ps:〃issues.apacheQ「g/ji「a/b「owse/FLINK-22352

[20]/jira/browse/FLINK-21935

[21]/jira/browse/FLINK-17012

[22]/jira/browse/FLINK-21698

[23]/jira/browse/FLINK-22133

[24]/jira/browse/FLINK-19463

[25]https://ci.apach/projects/flink/flink-docs-release-1.l3/docs/ops/state/state_

backends/#migrating-from-legacy-backends

[26]/downloads.html

[27]/project/apache-flink/

[28]/projects/flink/flink-docs-release-l.13/release-notes/flink-l.13

[29]/jira/secure/ReleaseNote.jspa?projectld=12315522&version=12349287

[30]/projects/flink/flink-docs-release-l.13/

深入解讀FlinkSQL1.1319

深入解讀RinkSQL1.13

來源IFlink中文社區(qū)

作者|徐榜江@阿里

摘要:ApacheFlink社區(qū)在5月份發(fā)布了1.13版本,帶來了很多新的變化。本文整

理自徐榜江(雪盡)5月22日在北京的FlinkMeetup分享的《深入解讀FlinkSQL

1.13?,內容包括:

?FlinkSQL1.13概覽

?核心feature解讀

o重要改進解讀

QFlinkSQL1.14未來規(guī)劃J

?總結

20

一、FlinkSQL1.13概覽

Flink1.13

Flink1.13issues

Others,92,9%

>解決Issues:1000+

>貢獻者:200+

>Table/SQL:400+

BTable/SQL■Runtime?API-Connectors

L■Checkpoint■StatebackendBOthers

FlinkSQL1.13是一個社區(qū)大版本,解決的issue在1000個以上,通過上圖我們可以

看到,解決的問題大部分是關于Table/SQL模塊,一共400多個issue占了總體的

37%左右。這些issue主要圍繞了5個FLIP展開,在本文中我們也會根據這5個方

面進行介紹,它們分別是:

21

下面我們對這些FLIP進行詳細解讀。

二、核心feature解讀

1.FLIP-145:支持WindowTVF

社區(qū)的小伙伴應該了解,在騰訊、阿里巴巴、字節(jié)跳動等公司的內部分支已經開發(fā)了

這個功能的基礎版本。這次Flink社區(qū)也在Flink1.13推出了TVF的相關支持和優(yōu)化。

下面將從WindowTVF語法、近實時累計計算場景、Window性能優(yōu)化、多維數據分析,

來解讀這個新功能。

FLIP-145:支持WindowTVF

WindowTVF語法

■1.1WindowTVF語法

在1.13版本前,window的實現是通過一個特殊的SqlGroupedWindowFunction:

SELECT

TUMBLE_S1ART(bidtimeJNTERVAL'10'MINUTE),

TUMBLE_END(bidtimeJNTERVAL'10*MINUTE),

TUMBLE_ROWTIME(bidtimeJNTERVAL'10'MINUTE),

SUM(price)

FROMMyTable

GROUPBYTUMBLE(bidtimeJNTERVAL'10'MINUTE)

22

在1.13版本中,我們對它進行了Table-ValuedFunction的語法標準化:

SELECTWINDOW_start,WINDOW_end,WINDOW_time,SUM(price)

FROMTable(TUMBLE(TablemyTable,DESCRIPTOR(biztime),INTERVAL'10'MINUTE))

GROUPBYWINDOW_start,WINDOW_end

通過對比兩種語法,我們可以發(fā)現:TVF語法更加靈活,不需要必須跟在GROUPBY

關鍵字后面,同時WindowTVF基于關系代數,使得其更加標準。在只需要劃分窗口

場景時,可以只用TVF,無需用GROUPBY做聚合,這使得TVF擴展性和表達能力更強,

支持自定義TVF(例如實現TOP-N的TVF)。

FUnkSQL>

$EL£CT-FROMTABLE(

fnmiElTABlfNyTable.DESCRIFTOR(bldtlM),IMTfRVAL'IfNINUTf5>);

Ibidtlac|price|Iten|window.start|window_end|

WindowTVF語法

>完整的關系代數表達

>輸入是一個關系,輸出也是一個關系

>每個關系對應成一個數據集

上圖中的示例就是利用TVF做的滾動窗口的劃分,只需要把數據劃分到窗口,無需

聚合;如果后續(xù)需要聚合,再進行GROPBY即可。同時,對于熟悉批SQL的用戶

來說,這種操作是非常自然的,我們不再需要像113版本之前那樣必須要用特殊的

SQLGroupedWindowFunction將窗口劃分和聚合綁定在一起。

目前WindowTVF支持tumblewindow,hopwindow,新增了cumulatewindow;

sessionwindow預計在1.14版本也會支持。

23

■1.2CumulateWindow

CumulateWindow

Cumulatewindow就是累計窗口,簡單來說,以上圖里面時間軸上的一個區(qū)間為窗

口步長。

?第一個window統(tǒng)計的是一個區(qū)間的數據;

?第二個window統(tǒng)計的是第一區(qū)間和第二個區(qū)間的數據;

?第三個window統(tǒng)計的是第一區(qū)間,第二個區(qū)間和第三個區(qū)間的數據。

累積計算在業(yè)務場景中非常常見,如累積UV場景。在UV大盤曲線中:我們每隔10

分鐘統(tǒng)計一次當天累積用戶UV。

24

近實時累計計算

INSERTINTOcuauUtIve.uv

SELECTdate.str,MAX(tine_$tr),COUWT(DISTINCTuser.id)uv

FROM(

UV大盤曲線SELECT

DATE_FO?MAT(ts,?yyyy-**-<W')asdate.str,

SUBSTR(DATE_FORMAT(tS,||astWe.Str,

user-id

>結果更精確FROMuser_behavlor)

GROUPBYdate_str;

>追數據時曲線不會跳變

INSERTDfTOCUWlatlVC.Uv

SELECTwindow.end,COIMT(D1ST1NCTuser.ld)asuv

FMMTABLE(

CUMULATE(TABLEuser_t>en?vlor,OESCRIPT(M(tS>.INTERVAL’16'MINUTES,INTERVAL

GMUPBYwlndow.start,wlndow-efld;

在1.13版本之前,當需要做這種計算時,我們一般的SQL寫法如下:

INSERTINTOcumulative_UV

SELECTdate_stcMAX(time_str)zCOUNT(DISTINCTuserjd)asUV

FROM(

SELECT

DAFEJORMAr^yyyy-MM-dd')asdate_strz

SUBSTR(DATE_FORMAF(ts;HH:mm)L4)||'0,astime_stc

userjd

FROMuser_behavior

)

GROUPBYdate_str

先將每條記錄所屬的時間窗口字段拼接好,然后再對所有記錄按照拼接好的時間窗口

字段,通過GROUPBY做聚合,從而達到近似累積計算的效果。

?1.13版本前的寫法有很多缺點,首先這個聚合操作是每條記錄都會計算一次。其次,

在追逆數據的時候,消費堆積的數據時,UV大盤的曲線就會跳變。

25

?在1.13版本支持了TVF寫法,基于cumulatewindow,我們可以修改為下面的寫法,

將每條數據按照EventTime精確地分到每個Window里面,每個窗口的計算通過

watermark觸發(fā),即使在追數據場景中也不會跳變。

INSERTINTOcumulative_UV

SELECTWINDOW_end,COUNT(DISTINCTuserjd)asUV

FROMTable(

CUMULATE(Tableuser_behaviocDESCRIPTOR(ts),INTERVAL'1O'MINUTES,INTERVAL'1'DAY))

)

GROUPBYWINDOW_start,WlNDOW_end

UV大盤曲線效果如下圖所示:

近實時累計計算

UV大盤曲線

每10分鐘統(tǒng)計一次當天的累

計用戶UV

■1.3Window性能優(yōu)化

ApacheFlink1.13社區(qū)開發(fā)者們對WindowTVF進行了一系列的性能優(yōu)化,包括:

?內存優(yōu)化:通過內存預分配,緩存window的數據,通過windowwatermark觸

發(fā)計算,通過申請一些內存buffer避免高頻的訪問state;

26

?切片優(yōu)化:將window切片,盡可能復用已計算結果,inhopwindow,cumulate

window。計算過的分片數據無需再次計算,只需對切片的計算結果進行復用;

?算子優(yōu)化:window算子支持local-global優(yōu)化;同時支持count(distinct)自動解

熱點優(yōu)化;

?遲到數據:支持將遲到數據計算到后續(xù)分片,保證數據準確性。

Window性能優(yōu)化

內存優(yōu)化切片優(yōu)化算子優(yōu)化遲到數據

將window切片,

通過內存預分配,支持

盡可能復用已計算window

緩存window的數local-global優(yōu)化,支持遲到數據計算

結果,如hop

據,通過window同時支持到后續(xù)分片,保證

window,

watermark觸發(fā)count(distinct)自數據準確性

cumulate

計算

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論