版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
TableAPI&SQL的介紹和使用為了方便開發(fā)和迭代,F(xiàn)link基于DataStream/DataSetAPI提供了一個(gè)更高層的關(guān)系型數(shù)據(jù)庫式的API——TableAPI&SQL。本章主要講述TableAPI&SQL的優(yōu)點(diǎn)及使用方式。通過本節(jié)學(xué)習(xí)您將可以:熟悉TableAPI&SQL接口。熟悉TableAPI&SQL的使用方式。了解相較DataStream/DataSetAPI,TableAPI&SQL的優(yōu)點(diǎn)。了解為什么選擇TableAPI&SQL。TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)
Table
API
&
SQL與關(guān)系型數(shù)據(jù)庫中的查詢相似基于數(shù)據(jù)表Table使用執(zhí)行計(jì)劃器(Planner)將關(guān)系型查詢轉(zhuǎn)換為可執(zhí)行的Flink作業(yè)Blink
Planner和Flink
Planner,Blink
Planner將逐漸取代Flink
PlannerTable
API
&
SQL迭代速度較快,最好參考最新的官方文檔1.創(chuàng)建執(zhí)行環(huán)境(ExecutionEnvironment)和表環(huán)境(TableEnvironment)2.獲取表3.使用TableAPI或SQL在表上做查詢等操作4.將結(jié)果輸出到外部系統(tǒng)5.執(zhí)行作業(yè)Table
API
&
SQL骨架程序//基于StreamExecutionEnvironment創(chuàng)建TableEnvironment
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//讀取數(shù)據(jù)源,創(chuàng)建數(shù)據(jù)表Table
user_behavior//注冊(cè)輸出數(shù)據(jù)表Table
output_table//使用TableAPI查詢user_behavior
TabletabApiResult=tableEnv.from("user_behavior").select(...);//使用SQL查詢TablesqlResult=tableEnv.sqlQuery("SELECT...FROMuser_behavior...");//將查詢結(jié)果輸出到outputTable
tabApiResult.insertInto("output_table");sqlResult.insertInto("output_table");Table
API
&
SQL程序主要步驟:注意添加Maven依賴TableEnvironment是整個(gè)程序的入口,功能包括:連接外部系統(tǒng)向目錄(Catalog)中注冊(cè)表或者從中獲取表執(zhí)行TableAPI或SQL操作注冊(cè)用戶自定義函數(shù)提供一些其他配置功能TableEnvironment是最頂級(jí)的接口StreamTableEnvironment用于流處理,有DataStream和Table之間的轉(zhuǎn)換接口BatchTableEnvironment用于批處理,有DataSet和Table之間的轉(zhuǎn)換接口創(chuàng)建TableEnvironment共5個(gè)TableEnvironment,分別面向不同的場(chǎng)景和編程語言用Table來表示廣義的表:需要連接外部系統(tǒng),需要定義Schema,將外部系統(tǒng)數(shù)據(jù)轉(zhuǎn)化為Table。臨時(shí)表(TemporaryTable):Flink作業(yè)啟動(dòng)后臨時(shí)創(chuàng)建的表,隨著這個(gè)Flink作業(yè)的結(jié)束,臨時(shí)表也被銷毀常駐表(PermanentTable):為整個(gè)集群上所有用戶和作業(yè)提供服務(wù),基于Catalog,作業(yè)結(jié)束后,Table元數(shù)據(jù)不會(huì)被銷毀。Catalog:維護(hù)著常駐表的名字、類型(文件、消息隊(duì)列或數(shù)據(jù)庫)、數(shù)據(jù)存儲(chǔ)位置等元數(shù)據(jù)數(shù)據(jù)管理團(tuán)隊(duì)在Catalog中創(chuàng)建常駐表,注冊(cè)好該表的Schema、注明該表使用何種底層技術(shù)、寫明數(shù)據(jù)存儲(chǔ)位置等;數(shù)據(jù)分析團(tuán)隊(duì)無需關(guān)心元數(shù)據(jù),無需了解這個(gè)表到底是存儲(chǔ)在Kafka還是HDFS,直接在這個(gè)表上進(jìn)行查詢。獲取表調(diào)用TableAPI或SQL進(jìn)行查詢可以在Table上使用TableAPI可以在Table上執(zhí)行SQL語句可以使用TableAPI生成一個(gè)表,在此之上進(jìn)行SQL查詢;也可以先進(jìn)行SQL查詢得到一個(gè)表,在此之上再調(diào)用TableAPI
在表上執(zhí)行語句StreamTableEnvironmenttEnv=...//創(chuàng)建一個(gè)TemporaryTable:user_behavior
TableuserBehaviorTable=tEnv.from("user_behavior");//在Table上使用TableAPI執(zhí)行關(guān)系型操作
TablegroupByUserId=userBehaviorTable.groupBy("user_id").select("user_id,COUNT(behavior)ascnt");//在Table上使用SQL執(zhí)行關(guān)系型操作
TablegroupByUserId=tEnv.sqlQuery("SELECTuser_id,COUNT(behavior)FROMuser_behaviorGROUPBYuser_id");Table
APISQL通過TableSink輸出到外部系統(tǒng)與DataStream
Sink相似將表結(jié)果輸出StreamTableEnvironmenttEnv=...//獲取名為CsvSinkTable的Table
//執(zhí)行查詢操作,得到一個(gè)名為result的Table
Tableresult=...//將result發(fā)送到名為CsvSinkTable的TableSink
result.executeInsert("CsvSinkTable");TableAPI或者SQL經(jīng)過Planner轉(zhuǎn)化為JobGraph,Planner在中間起到一個(gè)轉(zhuǎn)換和優(yōu)化的作用未經(jīng)優(yōu)化的邏輯執(zhí)行計(jì)劃(Logical
Plan)、優(yōu)化器(Optimizer)對(duì)Logical
Plan進(jìn)行優(yōu)化,得到物理執(zhí)行計(jì)劃(Physical
Plan),Physical
Plan最后轉(zhuǎn)換為Flink的JobGraph可以使用Table.explain()來查看語法樹、邏輯執(zhí)行計(jì)劃和物理執(zhí)行計(jì)劃執(zhí)行作業(yè)需要配置外部系統(tǒng)的必要參數(shù)、序列化方式、Schema:兩種方式:在程序中使用代碼編輯配置connect()或?qū)ataStream/DataSet轉(zhuǎn)化為表使用聲明式語言,如SQL
DDL或YAMLYAML只能和SQL
Client配合熟悉SQL
DDL的用戶多,未來將主要推廣SQL
DDL獲取表的具體方式TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)流處理上的關(guān)系型查詢借鑒了物化視圖的實(shí)現(xiàn)思路批處理關(guān)系型查詢與流處理
批處理關(guān)系型查詢流處理輸入數(shù)據(jù)數(shù)據(jù)是有界的,在有限的數(shù)據(jù)上進(jìn)行查詢數(shù)據(jù)流是無界的,在源源不斷的數(shù)據(jù)流上進(jìn)行查詢執(zhí)行過程一次查詢是在一個(gè)批次的數(shù)據(jù)上進(jìn)行查詢,所查詢的數(shù)據(jù)是靜態(tài)確定的一次查詢啟動(dòng)后需要等待數(shù)據(jù)不斷流入,所查詢的數(shù)據(jù)在未來源源不斷地到達(dá)查詢結(jié)果一次查詢完成后即結(jié)束。結(jié)果是確定的一次查詢會(huì)根據(jù)新流入數(shù)據(jù)不斷更新結(jié)果動(dòng)態(tài)表(DynamicTable)用來表示不斷流入的數(shù)據(jù)表,表中的數(shù)據(jù)不斷更新。在動(dòng)態(tài)表上進(jìn)行查詢,被稱為持續(xù)查詢。一個(gè)持續(xù)查詢的結(jié)果也是動(dòng)態(tài)表。動(dòng)態(tài)表上的持續(xù)查詢電商平臺(tái)用戶行為分析左側(cè)為數(shù)據(jù)流右側(cè)為轉(zhuǎn)化后的動(dòng)態(tài)表動(dòng)態(tài)表上的持續(xù)查詢按user_id字段分組,統(tǒng)計(jì)每個(gè)user_id所產(chǎn)生的行為總數(shù)新數(shù)據(jù)的插入會(huì)導(dǎo)致統(tǒng)計(jì)結(jié)果的更新動(dòng)態(tài)表上的持續(xù)查詢SQL
1SELECT
user_id,COUNT(behavior)ASbehavior_cntFROMuser_behaviorGROUP
BYuser_id按照user_id字段分組,統(tǒng)計(jì)每分鐘每個(gè)user_id所產(chǎn)生的行為總數(shù)數(shù)據(jù)按照滾動(dòng)時(shí)間窗口來分組動(dòng)態(tài)表上的持續(xù)查詢SQL
2SELECT
user_id,COUNT(behavior)ASbehavior_cnt,TUMBLE_END(ts,INTERVAL
'1'
MINUTE)ASend_tsFROMuser_behaviorGROUP
BY
user_id,
TUMBLE(ts,INTERVAL
'1'
MINUTE)兩種生成結(jié)果的方式:SQL
2只追加結(jié)果,或者說只在結(jié)果表上進(jìn)行插入操作。SQL
1追加結(jié)果的同時(shí),也對(duì)結(jié)果不斷更新,或者說既進(jìn)行插入操作又進(jìn)行更新操作或刪除操作。動(dòng)態(tài)表上的持續(xù)查詢兩種輸出方式:追加(Append-only)模式:在結(jié)果末尾追加。更新(Update)模式:既在結(jié)果末尾追加,又對(duì)已有數(shù)據(jù)更新。對(duì)數(shù)據(jù)更新又分為兩種:先將舊數(shù)據(jù)撤回,再添加新數(shù)據(jù),被稱為撤回(Retract)模式直接在舊數(shù)據(jù)上做更新,被稱為插入更新(Upsert)模式動(dòng)態(tài)表的兩種輸出方式結(jié)果共有3列(flag,user_id,behavior_cnt)其中第一列為標(biāo)志位,表示本行數(shù)據(jù)是加入還是撤回,后兩列是查詢結(jié)果。Retract模式//將table轉(zhuǎn)換為DataStream
//Retract模式,Boolean為標(biāo)志位
DataStream<Tuple2<Boolean,Row>>retractStream=tableEnv.toRetractStream(table,Row.class);輸出結(jié)果需有一個(gè)唯一ID,可以根據(jù)唯一ID更新結(jié)果例如user_id一般不重復(fù),可以被用來作為唯一IDUpsert模式要和特定的TableSink緊密結(jié)合Key-Value數(shù)據(jù)更適合進(jìn)行Upsert操作Upsert模式Flink通過狀態(tài)保存中間數(shù)據(jù),狀態(tài)不能無限增加,否則會(huì)突破存儲(chǔ)限制??臻e狀態(tài)數(shù)據(jù)是指該數(shù)據(jù)長時(shí)間沒有更新,仍然保留在狀態(tài)中。清除空閑狀態(tài)數(shù)據(jù):minTime和maxTime:空閑狀態(tài)至少會(huì)保留minTime的時(shí)間,這個(gè)時(shí)間內(nèi)數(shù)據(jù)不會(huì)被清理;超過maxTime的時(shí)間后,空閑狀態(tài)會(huì)被清除。部分狀態(tài)被清除后,會(huì)導(dǎo)致計(jì)算結(jié)果是近似準(zhǔn)確的狀態(tài)過期時(shí)間tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1),Time.hours(2));對(duì)TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)時(shí)間屬性使用TIMESTAMP(intprecision)數(shù)據(jù)類型來表示,對(duì)應(yīng)SQL標(biāo)準(zhǔn)中的時(shí)間戳類型
precision為精度,表示秒以下保留幾位小數(shù)點(diǎn)時(shí)間的格式一般為:year-month-dayhour:minute:second[.fractional]絕大多數(shù)情況可以使用毫秒精度:TIMESTAMP(3)Flink提供的時(shí)間單位:MILLISECOND、SECOND、MINUTE、HOUR、DAY、MONTH和YEAR
時(shí)間屬性需要在Java/Scala代碼中設(shè)置使用哪種時(shí)間語義三種時(shí)間語義StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//默認(rèn)使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//使用IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//使用EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);SQL
DDL時(shí)間屬性列proctime,使用PROCTIME()函數(shù)計(jì)算得到Processing
Time:時(shí)間屬性CREATE
TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--在原有Schema基礎(chǔ)上添加一列proctime
proctimeasPROCTIME())WITH(
...);將DataStream轉(zhuǎn)化為表時(shí)間屬性列proctimectime:使用proctime函數(shù),生成proctime列Processing
Time:時(shí)間屬性DataStream<UserBehavior>userBehaviorDataStream=...//定義了Schema中各字段的名字,其中proctime使用了.proctime屬性,這個(gè)屬性幫我們生成一個(gè)ProcessingTime
tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ctime");指定時(shí)間屬性和Watermark策略SQL:使用WATERMARK關(guān)鍵字,并設(shè)置Watermark策略語法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionEvent
Time:時(shí)間屬性&
WatermarkCREATE
TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--定義ts字段為EventTime時(shí)間戳,Watermark比監(jiān)測(cè)到的最晚時(shí)間還晚5秒
WATERMARKFORtsasts-INTERVAL
'5'
SECOND
)WITH(
...);
語法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionrowtime_column為時(shí)間屬性,必須是TIMESTAMP(3)類型watermark_strategy_expression定義了Watermark的生成策略:時(shí)間戳嚴(yán)格單調(diào)遞增WATERMARKFORrowtime_columnASrowtime_columnWATERMARKFORrowtime_columnASrowtime_column-INTERVAL'0.001'SECOND監(jiān)測(cè)所有數(shù)據(jù)時(shí)間戳,并記錄時(shí)間戳最大值,在最大值基礎(chǔ)上添加一個(gè)1毫秒的延遲作為Watermark時(shí)間時(shí)間戳是亂序到達(dá)的WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'duration'timeUnittimeUnit可以是SECOND、MINUTE或HOUR等時(shí)間單位Event
Time:時(shí)間屬性&
Watermark由DataStream轉(zhuǎn)換為表在DataStream
API中設(shè)置好時(shí)間戳和Watermarkts.rowtime:使用rowtime函數(shù),生成ts時(shí)間戳列Event
Time:時(shí)間屬性&
Watermarkenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorDataStream=env.addSource(...)//在DataStream里設(shè)置時(shí)間戳和Watermark
.assignTimestampsAndWatermarks(...);//創(chuàng)建一個(gè)user_behavior表//ts.rowtime表示該列使用EventTimeTimestamp
tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ts.rowtime");基于時(shí)間屬性窗口分組GROUP
BYOVERWINDOW聚合窗口聚合GROUPBYfield1,time_attr_window:time_attr_window窗口分組函數(shù):例如TUMBLE(proctime,INTERVAL'1'MINUTE)所有含有相同field1+time_attr_window的行都會(huì)被分到一組再對(duì)這組數(shù)據(jù)中的其他字段(如field2)進(jìn)行聚合操作聚合操作:COUNT、SUM、AVG、MAX等將多行數(shù)據(jù)分到一組,然后對(duì)一組數(shù)據(jù)集進(jìn)行聚合:多行變一行GROUP
BYTUMBLE(time_attr,interval):滾動(dòng)窗口窗口是定長的,長度為interval,窗口之間不重疊,滾動(dòng)向前HOP(time_attr,slide_interval,size_interval)
窗口長度是定長的,長度為size_interval,窗口以slide_interval的速度向前滑動(dòng)slide_interval
<
size_interval:窗口重疊slide_interval
>
size_interval:窗口之間有間隙SESSION(time_attr,interval):會(huì)話窗口窗口長度是變長的,根據(jù)interval劃分窗口時(shí)間間隔格式:INTERVAL‘duration’timeUnitINTERVAL'1'MINUTE窗口分組函數(shù)TUMBLE_START(time_attr,interval):當(dāng)前窗口的起始時(shí)間返回值不再是時(shí)間屬性TUMBLE_END(time_attr,interval)
:當(dāng)前窗口的結(jié)束時(shí)間返回值不再是時(shí)間屬性TUMBLE_ROWTIME(time_attr,interval)
:窗口的結(jié)束時(shí)間返回值是一個(gè)時(shí)間屬性,后續(xù)的查詢可以使用這個(gè)字段基于Event
TimeTUMBLE_PROCTIME(time-attr,interval)
:窗口的結(jié)束時(shí)間
返回值是一個(gè)時(shí)間屬性,后續(xù)的查詢可以使用這個(gè)字段基于Processing
Time窗口的起始和結(jié)束時(shí)間TUMBLE_START
/
TUMBLE_END使用方法:TUMBLE(time_attr,interval)中的interval和TUMBLE_START(time_attr,interval)中的interval保持一致,即INTERVAL‘duration’timeUnit中的duration時(shí)間長度和timeUnit時(shí)間單位,兩者保持一致TUMBLE_START
/
TUMBLE_ENDTUMBLE_ROWTIME
/TUMBLE_PROCTIME使用方法可以用在內(nèi)聯(lián)視圖子查詢或Join上案例:先使用TUMBLE_ROWTIME創(chuàng)建一個(gè)10秒鐘的視圖再在視圖的基礎(chǔ)上進(jìn)行20分鐘的聚合TUMBLE_ROWTIME
/
TUMBLE_PROCTIMESELECTTUMBLE_END(rowtime,INTERVAL'20'MINUTE),user_id,SUM(cnt)
FROM
(SELECTuser_id,COUNT(behavior)AScnt,TUMBLE_ROWTIME(ts,INTERVAL'10'SECOND)ASrowtimeFROMuser_behaviorGROUPBYuser_id,TUMBLE(ts,INTERVAL'10'SECOND)
)GROUPBYTUMBLE(rowtime,INTERVAL'20'MINUTE),user_id每行數(shù)據(jù)生成窗口,在窗口上進(jìn)行聚合,聚合的結(jié)果會(huì)生成一個(gè)新字段:一行變一行OVER
WINDOW計(jì)算流程:先對(duì)field1做分組,包含相同field1的行被分到一起,按照時(shí)間屬性排序(PARTITION
BY
…
ORDER
BY
…)每行數(shù)據(jù)建立一個(gè)窗口,窗口起始點(diǎn)為第一行數(shù)據(jù),窗口結(jié)束點(diǎn)是當(dāng)前行對(duì)窗口內(nèi)field2字段做各類聚合操作,生成field2_agg的新字段(COUNT、SUM、AVG、MAX等)Flink為每行元素維護(hù)一個(gè)窗口,為每行元素執(zhí)行一次窗口計(jì)算,完成計(jì)算后清除過期數(shù)據(jù)OVER
WINDOWOVER
WINDOW的計(jì)算過程windowDefinition中定義了窗口規(guī)則使用哪些字段進(jìn)行PARTITIONBY使用時(shí)間屬性進(jìn)行ORDER
BY定義窗口的起始點(diǎn)和結(jié)束點(diǎn)在定義好的窗口上,使用聚合函數(shù)AGG_FUNCTION對(duì)某個(gè)字段進(jìn)行聚合計(jì)算COUNT、MAX等OVER
WINDOW語法SELECTAGG_FUNCTION(field2)OVER(windowDefinition2)ASfield2_agg,...AGG_FUNCTION(fieldN)OVER(windowDefinitionN)ASfieldN_aggFROMtab1SELECT
AGG_FUNCTION(field2)OVERwASfield2_agg,...FROMtab1WINDOWwAS(windowDefinition)在SQL語句最后,使用別名AS定義WINDOW使用OVER
windowDefinition
AS
…語法結(jié)構(gòu)定義窗口ROWS按行劃分WINDOWwAS(...)定義了名為w的窗口,根據(jù)user_id來分組,按照ts排序,相同user_id的行會(huì)分到一組,組內(nèi)按照時(shí)間戳ts來排序ROWSBETWEENUNBOUNDEDPRECEDINGANDCURRENTROW定義了窗口的起始點(diǎn)和結(jié)束點(diǎn),起始點(diǎn)為UNBOUNDEDPRECEDING,即數(shù)據(jù)流的最開始的行,結(jié)束點(diǎn)為CURRENTROW當(dāng)前行ORDER
BY只支持時(shí)間屬性的排序,無法對(duì)其他字段進(jìn)行排序窗口劃分方式-
ROWSSELECT
user_id,behavior,COUNT(*)OVERwASbehavior_count,tsFROMuser_behaviorWINDOWwAS(PARTITION
BYuser_idORDER
BYtsROWS
BETWEEN
UNBOUNDED
PRECEDING
AND
CURRENT
ROW
)右圖上半部分,窗口起始點(diǎn)為數(shù)據(jù)流的第一個(gè)元素,結(jié)束點(diǎn)為當(dāng)前行右圖下半部分,窗口起始點(diǎn)本元素的前一個(gè)元素,結(jié)束點(diǎn)為當(dāng)前行右圖下半部分,最后兩個(gè)元素同時(shí)到達(dá),按行劃分,被劃分到2個(gè)窗口窗口劃分方式-
ROWSPARTITIONBY可選,根據(jù)一到多個(gè)字段對(duì)數(shù)據(jù)進(jìn)行分組ORDERBY之后必須是時(shí)間屬性,按照時(shí)間排序ROWSBETWEEN...AND...界定窗口的起始點(diǎn)和結(jié)束點(diǎn)窗口劃分方式-
ROWSSELECT
field1,AGG_FUNCTION(field2)OVER([PARTITION
BY(value_expression1,...,value_expressionN)]ORDER
BYtimeAttrROWS
BETWEEN(UNBOUNDED|rowCount)PRECEDING
AND
CURRENT
ROW)ASfieldNameFROMtab1--使用AS
SELECT
field1,AGG_FUNCTION(field2)OVERwASfieldNameFROMtab1WINDOWwAS([PARTITION
BY(value_expression1,...,value_expressionN)]ORDER
BYtimeAttrROWS
BETWEEN(UNBOUNDED|rowCount)PRECEDING
AND
CURRENT
ROW
)RANGE按時(shí)間段劃分WINDOWwAS(...)語法結(jié)構(gòu)與之前的類似使用RANGE關(guān)鍵字窗口的結(jié)束點(diǎn)是當(dāng)前行,起始點(diǎn)是當(dāng)前行之前的某個(gè)時(shí)間點(diǎn)(當(dāng)前行的時(shí)間-
interval)窗口劃分方式-
RANGESELECT
user_id,COUNT(*)OVERwASbehavior_count,tsFROMuser_behaviorWINDOWwAS(PARTITION
BYuser_idORDER
BYtsRANGE
BETWEEN
INTERVAL
'2'
SECOND
PRECEDING
AND
CURRENT
ROW
)右圖上半部分,窗口起始點(diǎn)為數(shù)據(jù)流的第一個(gè)元素,結(jié)束點(diǎn)為當(dāng)前元素。與ROWS不同,最后兩個(gè)元素同時(shí)到達(dá),被劃分到一個(gè)窗口w4中。右圖下半部分,窗口起始點(diǎn)為當(dāng)前元素減去2秒,結(jié)束點(diǎn)為當(dāng)前元素。最后兩個(gè)元素也被劃分到同一個(gè)窗口w4中。窗口劃分方式-
RANGEPARTITIONBY可選,根據(jù)一到多個(gè)字段對(duì)數(shù)據(jù)進(jìn)行分組ORDERBY之后必須是時(shí)間屬性,按照時(shí)間排序RANGE
BETWEEN...AND...界定窗口的起始點(diǎn)和結(jié)束點(diǎn)可以使用timeIntervalPRECEDING來表示當(dāng)前行之前的某個(gè)時(shí)間點(diǎn)作為起始點(diǎn)窗口劃分方式-
RANGESELECT
field1,AGG_FUNCTION(field2)OVER([PARTITION
BY(value_expression1,...,value_expressionN)]ORDER
BYtimeAttrRANGE
BETWEEN(UNBOUNDED|timeInterval)PRECEDING
AND
CURRENT
ROW)ASfieldNameFROMtab1--使用AS
SELECT
field1,AGG_FUNCTION(field2)OVERwASfieldNameFROMtab1WINDOWwAS([PARTITION
BY(value_expression1,...,value_expressionN)]ORDER
BYtimeAttrRANGE
BETWEEN(UNBOUNDED|timeInterval)PRECEDING
AND
CURRENT
ROW
)對(duì)TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)常見的Join:INNER
JOIN、LEFT/RIGHT/FULLOUTERJOIN使用批處理,在靜態(tài)數(shù)據(jù)集上進(jìn)行Join已經(jīng)比較成熟:嵌套循環(huán)、排序合并、哈希合并
Flink的三種Join時(shí)間窗口Join(Time-windowedJoin)臨時(shí)表Join(TemporalTableJoin)傳統(tǒng)意義上的Join(RegularJoin)
JoinSELECT
orders.order_id,customers.customer_name,orders.order_dateFROMordersINNER
JOINcustomersONorders.customer_id=customers.customer_id;//循環(huán)遍歷orders的每個(gè)元素forrow_orderinorders://循環(huán)遍歷customers的每個(gè)元素forrow_customerincustomers:ifrow_order.customer_id=row_customer.customer_idreturn(row_order.order_id,row_customer.customer_name,row_order.order_date)endend循環(huán)嵌套偽代碼一個(gè)INNER
JOIN案例案例:聊天對(duì)話數(shù)據(jù)流chat表包含了買家和賣家聊天信息,chat表與user_behavior進(jìn)行Join對(duì)item_id字段進(jìn)行Join,并增加時(shí)間窗口的限制時(shí)間窗口JoinSELECT
user_behavior.item_id,user_behavior.tsASbuy_tsFROMchat,user_behaviorWHEREchat.item_id=user_behavior.item_idANDuser_behavior.behavior='buy’
ANDuser_behavior.tsBETWEENchat.tsANDchat.ts+INTERVAL
'1'
MINUTE;與DataStream
API中的Interval
Join相似A表中所有包含在界限內(nèi)的元素與B表元素連接BETWEEN...AND...設(shè)置了時(shí)間窗口,也可以使用比較符號(hào)>,<,>=,<=A表和B表必須都是Append-only模式的表Flink使用狀態(tài)存儲(chǔ)時(shí)間窗口相關(guān)數(shù)據(jù)時(shí)間窗口JoinSELECT*FROMA,BWHEREA.id=B.id
ANDA.tsBETWEENB.ts-lowBoundANDB.ts+upperBound;
將一個(gè)基于時(shí)間的日志表抽象成為臨時(shí)表(Temporal
Table)案例:商品價(jià)格日志表item_log,包含了每個(gè)商品的每次價(jià)格變動(dòng),price為當(dāng)前的價(jià)格,version_ts為價(jià)格改動(dòng)的時(shí)間戳不同時(shí)間點(diǎn),商品價(jià)格不同。TemporalTable為某個(gè)時(shí)間點(diǎn)的臨時(shí)表臨時(shí)表Join其他表與臨時(shí)表進(jìn)行Join,希望得到某個(gè)時(shí)間點(diǎn)的Join結(jié)果案例:user_behavior與item_log表進(jìn)行Join,得到產(chǎn)生用戶行為時(shí)間點(diǎn)的價(jià)格臨時(shí)表Joinitem_log與user_behavior進(jìn)行臨時(shí)表Join示意圖臨時(shí)表使用方法:注冊(cè)臨時(shí)表在SQL語句中使用臨時(shí)表registerFunction()注冊(cè)臨時(shí)表,名為item在SQL語句中,item(user_behavior.ts)按照user_behavior表中的ts來獲取該時(shí)間點(diǎn)上對(duì)應(yīng)的臨時(shí)表,將這個(gè)表命名為latest_itemuser_behavior與latest_item進(jìn)行Join臨時(shí)表JoinDataStream<Tuple3<Long,Long,Timestamp>>itemStream=...//獲取Table
TableitemTable=tEnv.fromDataStream(itemStream,"item_id,price,version_ts.rowtime");//注冊(cè)TemporalTableFunction,指定時(shí)間屬性和Key
tEnv.registerFunction("item",itemTable.createTemporalTableFunction("version_ts","item_id"));在Java代碼中注冊(cè)臨時(shí)表SELECT
user_behavior.item_id,latest_item.price,user_behavior.tsFROMuser_behavior,LATERALTABLE(item(user_behavior.ts))ASlatest_itemWHEREuser_behavior.item_id=latest_item.item_id ANDuser_behavior.behavior='buy'在SQL語句中使用臨時(shí)表item臨時(shí)表Join注意事項(xiàng):A表必須是一個(gè)Append-only的追加表。臨時(shí)表B的數(shù)據(jù)源必須是一個(gè)Append-only的追加表,必須使用registerFunction()將該追加表注冊(cè)到Catalog中。注冊(cè)時(shí)需要指定Key和時(shí)間屬性。表A和臨時(shí)表B通過Key進(jìn)行等于謂詞匹配:A.id=B.id。Flink用狀態(tài)維護(hù)中間數(shù)據(jù)臨時(shí)表JoinSELECT*FROMA,LATERALTABLE(B(A.ts))WHEREA.id=B.id從時(shí)間維度上理解臨時(shí)表Join最常規(guī)的Join案例:商品價(jià)格表只保存了當(dāng)前最新的價(jià)格,沒有保存修改記錄傳統(tǒng)意義上的Join
SELECT
user_behavior.item_id,item.priceFROMuser_behavior,itemWHEREuser_behavior.item_id=item.item_idANDuser_behavior.behavior='buy'A和B可以是Append-only的追加表,也可以是可更新的Update表,A、B兩個(gè)表中的數(shù)據(jù)可以插入、刪除和更新。A、B表對(duì)應(yīng)的元素都會(huì)被連接起來。盡量避免笛卡爾積式的連接。Flink用狀態(tài)存儲(chǔ)一些中間數(shù)據(jù),最好設(shè)置狀態(tài)過期時(shí)間。傳統(tǒng)意義上的Join
SELECT*FROMAINNER
JOINBONA.id=B.id對(duì)TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)Catalog記錄并管理各類元數(shù)據(jù)信息有哪些數(shù)據(jù)庫(Database)、存儲(chǔ)形式為文件、消息隊(duì)列、數(shù)據(jù)庫數(shù)據(jù)庫中有哪些表有哪些可用的函數(shù)一個(gè)Catalog下有一到多個(gè)Database,一個(gè)Database下有一到多個(gè)表GenericInMemoryCatalog:將元數(shù)據(jù)存儲(chǔ)在內(nèi)存,只在一個(gè)Session內(nèi)生效HiveCatalog:可以將元數(shù)據(jù)持久化,數(shù)據(jù)管理團(tuán)隊(duì)將數(shù)據(jù)注冊(cè)到HiveCatalog中,數(shù)據(jù)分析團(tuán)隊(duì)從HiveCatalog中獲取表,直接進(jìn)行計(jì)算Catalog使用SQL
DDL:CREATE
TABLE
…未來將主要使用這種方式使用TableEnvironment.connect()未來將逐漸廢棄從Catalog中獲取已注冊(cè)的表如何獲取表USE、SHOWCREATE、DROP、ALTER
將SQL語句粘貼到Java代碼的executeSql()中執(zhí)行常見SQL
DDL對(duì)TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)根據(jù)是否為系統(tǒng)內(nèi)置來分類系統(tǒng)內(nèi)置函數(shù)(System
Function):Flink提供的內(nèi)置函數(shù)非系統(tǒng)內(nèi)置函數(shù):需要我們注冊(cè)到某個(gè)Catalog中,又被稱為目錄函數(shù)(Catalog
Function)根據(jù)是否為臨時(shí)函數(shù)來分類臨時(shí)函數(shù)(TemporaryFunction),只存在于某個(gè)Flink
Session中,Session結(jié)束后就不可使用。非臨時(shí)函數(shù),又被稱為持久化函數(shù)(PersistentFunction),可以是一個(gè)系統(tǒng)內(nèi)置函數(shù),也可以是一個(gè)目錄函數(shù)。函數(shù)根據(jù)上述維度分為:臨時(shí)系統(tǒng)內(nèi)置函數(shù)(TemporarySystemFunction)。持久化系統(tǒng)內(nèi)置函數(shù)(SystemFunction)。臨時(shí)目錄函數(shù)(TemporaryCatalogFunction)。持久化目錄函數(shù)(CatalogFunction)。函數(shù)分類標(biāo)量函數(shù)邏輯函數(shù)數(shù)學(xué)函數(shù)字符串函數(shù)時(shí)間函數(shù)判斷函數(shù)類型轉(zhuǎn)化函數(shù)集合函數(shù)聚合函數(shù)系統(tǒng)內(nèi)置函數(shù)對(duì)TableAPI&SQL綜述動(dòng)態(tài)表和持續(xù)查詢時(shí)間和窗口JoinSQLDDL系統(tǒng)內(nèi)置函數(shù)用戶自定義函數(shù)當(dāng)系統(tǒng)內(nèi)置函數(shù)無法滿足特定的需求時(shí),可以進(jìn)行用戶自定義函數(shù)registerFunction():將函數(shù)名和對(duì)應(yīng)實(shí)現(xiàn)記錄下來。自定義函數(shù):需要自己實(shí)現(xiàn)函數(shù)的業(yè)務(wù)邏輯三種自定義函數(shù):標(biāo)量函數(shù)表函數(shù)聚合函數(shù)用戶自定義函數(shù)標(biāo)量函數(shù)接收零個(gè)、一個(gè)或者多個(gè)輸入,生成一個(gè)單值輸出。案例:經(jīng)緯度,判斷給定經(jīng)緯度數(shù)據(jù)是否在北京四環(huán)以內(nèi)
繼承ScalarFunction,實(shí)現(xiàn)eval()方法注意eval()方法的輸入?yún)?shù)類型和返回結(jié)果類型eval()方法的輸入和輸出類型決定了函數(shù)的輸入和輸出類型標(biāo)量函數(shù)public
class
IsInFourRing
extends
ScalarFunction{//北京四環(huán)經(jīng)緯度范圍
private
static
doubleLON_EAST=116.48;private
static
doubleLON_WEST=116.27;private
static
doubleLAT_NORTH=39.988;private
static
doubleLAT_SOUTH=39.83;//判斷輸入的經(jīng)緯度是否在四環(huán)內(nèi)
public
boolean
eval(doublelon,doublelat)
{return!(lon>LON_EAST||lon<LON_WEST)&& !(lat>LAT_NORTH||lat<LAT_SOUTH);}}自定義好函數(shù)后,還需要使用registerFunction()將函數(shù)注冊(cè)到Catalog中,為函數(shù)起名,注冊(cè)后才能在SQL中使用這個(gè)函數(shù)eval()方法的輸入?yún)?shù)類型對(duì)應(yīng)SQL中的輸入類型標(biāo)量函數(shù)//注冊(cè)函數(shù)到Catalog中,指定名字
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025農(nóng)村買地合同樣本
- 2025私營公司工程合同
- 2025年度農(nóng)村水塘水域環(huán)境保護(hù)與承包合同
- 二零二五年度環(huán)保產(chǎn)業(yè)散伙協(xié)議書3篇
- 2025年度公司與自然人共同開發(fā)項(xiàng)目合作協(xié)議3篇
- 2025年企業(yè)法人變更合同審查與合同效力確認(rèn)服務(wù)3篇
- 二零二五年度公司股東內(nèi)部關(guān)于企業(yè)可持續(xù)發(fā)展戰(zhàn)略的協(xié)議書2篇
- 二零二五年度智慧城市運(yùn)營合作出資協(xié)議模板
- 2025抵押貸款還款合同
- 二零二五年度農(nóng)村新建住宅不含材料包工協(xié)議
- 新媒體個(gè)人賬號(hào)分析報(bào)告
- 掃雪鏟冰安全教育培訓(xùn)
- 涉密內(nèi)網(wǎng)分級(jí)保護(hù)設(shè)計(jì)方案
- 土地清查服務(wù)流程
- 農(nóng)民專業(yè)合作社章程(參考范本)
- 搶救儀器設(shè)備管理培訓(xùn)課件
- 幼兒園大班上學(xué)期社會(huì)教案《今天我當(dāng)家》及教學(xué)反思
- 市政設(shè)施維護(hù)工程道路橋梁維護(hù)施工與方案
- 腦出血入院記錄
- 中華傳統(tǒng)文化之文學(xué)瑰寶學(xué)習(xí)通超星課后章節(jié)答案期末考試題庫2023年
- 自粘聚合物改性瀝青防水卷材施工工藝與規(guī)程
評(píng)論
0/150
提交評(píng)論