2022Apache Flink 必知必會知識_第1頁
2022Apache Flink 必知必會知識_第2頁
2022Apache Flink 必知必會知識_第3頁
2022Apache Flink 必知必會知識_第4頁
2022Apache Flink 必知必會知識_第5頁
已閱讀5頁,還剩165頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

ApacheFlink必知必會知識目錄TOC\o"1-1"\h\z\u走進Apchelik 4reamrceigwihApchelik 25likRuieArchieure 46aulleraceinlik 64likQL_ale介紹與實戰(zhàn) 94ylik快速上手 15likcytes 11likCecr詳解 14走進paheFlink什么是heFk為什么要學習heFkheFk典型應用場景heFk基本概念一、什么是acelink5走進peFlk<5ApacheFlink是一個開源的基于流的有狀態(tài)計算框架。它是分布式地執(zhí)行的,具備低延遲、高吞吐的優(yōu)秀性能,并且非常擅長處理有狀態(tài)的復雜計算邏輯場景。FlikApacheFlink是Apache開源軟件基金會的一個頂級項目,和許多Apache頂級項目一樣,如Spark起源于UC伯克利的實驗室,F(xiàn)link也是起源于非常有名的大學的實驗室——柏林工業(yè)大學實驗室。項目最初的名稱為Stratosphere,目標是要讓大數(shù)據(jù)的處理看起來更加地簡潔。項目初始的代碼貢獻者中,有很多至今仍活躍在Apache的項目管理委員會里,在社區(qū)里持續(xù)做出貢獻。6>走進peFlk6Stratosphere項目于2010年發(fā)起,從它的Gitcommit日志里面可以看到,它的第一行代碼是在2010年的12月15日編寫的。2014年5月,Stratosphere項目被貢獻到Apache軟件基金會,作為孵化器項目進行孵化,并更名為Flink。FlikFnk487v.-ctg。7走進peFlk<7由于Flink項目吸引了非常多貢獻者參與,活躍度等方面也都非常優(yōu)秀,它在2014年12月成為了Apache的頂級項目。成為頂級項目之后,它在一個月之后發(fā)布了第一個Release版本Flink..。在此之后,F(xiàn)k基本保持41Flikpce發(fā)展至今,F(xiàn)link已成為Apache社區(qū)最活躍的大數(shù)據(jù)項目。它的用戶與開發(fā)者郵件列表活躍度在2020年的Apache年度報告中排名第一。如上方右圖所示,與非?;钴S的Spark項目相比,可以看到用戶郵件列表的活躍度,F(xiàn)ink比Sarkithb的用戶訪FinkpceFink都是排名第一。8>走進peFlk8從2019年4月至今,F(xiàn)link社區(qū)發(fā)布了5個版本,每一個版本都有更多的ommt二、為什么要學習acelink(一)大數(shù)據(jù)處理的實時化趨勢隨著網(wǎng)絡迅速發(fā)展,大數(shù)據(jù)的處理呈現(xiàn)出非常明顯的實時化趨勢。如上圖所示,我們列舉了一些現(xiàn)實生活中常見的場景。如春晚的直播有一個實時大屏,雙11購物節(jié)也有實時成交額的統(tǒng)計和媒體匯報。城市大腦可以實時監(jiān)測交通,銀行可以實時進行風控監(jiān)測。當我們打開淘寶、天貓等應用軟件時,它都會根據(jù)用戶不同的習慣進行實時個性化推薦。從以上例子我們可以看到,實時化就是當下大數(shù)據(jù)處理的趨勢。9走進peFnk<9Flik在實時化的大趨勢底下,F(xiàn)k已成為國內(nèi)外實時計算事實標準。如上圖所示,目前國內(nèi)外許多公司都在使用Flink,國際公司有Netflix、eBay,LinkedIn等,國內(nèi)有阿里巴巴、騰訊、美團、小米、快手等大型互聯(lián)網(wǎng)公司。(三)流計算引擎的演進pcheSorm是一個純流的設計,延遲非常的低,但是它的問題也比較明顯,即沒有辦法避免消息的重復處理,從而導致數(shù)據(jù)正確性有一定的問題。 0>走進peFlkSparkStreaming是第二代流計算引擎,解決了流計算語義正確性的問題,但是它的設計理念是以批為核心,最大的問題是延遲比較高,只能做到10秒級別的延遲,端到端無法實現(xiàn)秒以內(nèi)的延遲。Flink是第三代流計算引擎,也是最新一代的流計算引擎。它既可以保證低延遲,同時又可以保證消息的一致性語義,對于內(nèi)置狀態(tài)的管理,也極大降低了應用程序的復雜度。三、acelink(一)事件驅(qū)動型應用第一類應用場景是事件驅(qū)動型應用。事件驅(qū)動表示一個事件會觸發(fā)另一個或者是很多個后續(xù)的事件,然后這一系列事件會形成一些信息,基于這些信息需要做一定的處理。進peFlk<1 在社交場景下,以微博為例,當我們點擊了一個關注之后,被關注人的粉絲數(shù)就會發(fā)生變化。之后如果被關注的人發(fā)了一條微博,關注他的粉絲也會收到消息通知,這是一個典型的事件驅(qū)動。另外,在網(wǎng)購的場景底下,如用戶給商品做評價,這些評價一方面會影響店鋪的星級,另外一方面有惡意差評的檢測。此外,用戶通過點擊信息流,也可以看到商品派送或其他狀態(tài),這些都可能觸發(fā)后續(xù)的一系列事件。還有金融反欺詐的場景,詐騙者通過短信詐騙,然后在取款機竊取別人的錢財。在這種場景底下,我們通過攝像頭拍攝后,迅速反應識別出來,然后對犯罪的行為進行相應的處理。這也是一個典型的事件驅(qū)動型應用。總結(jié)一下,事件驅(qū)動型應用是一類具有狀態(tài)的應用,會根據(jù)事件流中的事件觸發(fā)計算、更新狀態(tài)或進行外部系統(tǒng)操作。事件驅(qū)動型應用常見于實時計算業(yè)務中,比如:實時推薦,金融反欺詐,實時規(guī)則預警等。 2>走進peFlk(二)數(shù)據(jù)分析型應用第二類典型應用場景是數(shù)據(jù)分析型應用,如雙11成交額實時匯總,包括PV、UV的統(tǒng)計。包括上方圖中所示,是Apache開源軟件在全世界不同地區(qū)的一個下載量,其實也是一個信息的匯總。還包括一些營銷大屏,銷量的升降,營銷策略的結(jié)果進行環(huán)比、同比的比較,這些背后都涉及到大量信息實時的分析和聚合,這些都是Flink非常典型的使用場景。進peFlk<3 如上圖所示,以雙11為例,在2020年天貓雙11購物節(jié),阿里基于Flink的實時計算平臺每秒處理的消息數(shù)達到了40億條,數(shù)據(jù)體量達到7TB,訂單創(chuàng)建數(shù)達到58萬/秒,計算規(guī)模也超過了150萬核??梢钥吹?,這些應用的場景體量很大且對于實時性要求非常高,這也是heFk非常擅長的場景。(三)數(shù)據(jù)管道型應用(ETL)heFkET。ETL(Extract-Transform-Load)是從數(shù)據(jù)源抽取/轉(zhuǎn)換/加載/數(shù)據(jù)至目的端的過程。傳統(tǒng)的ETL使用離線處理,經(jīng)常做的是小時級別或者天級別的ETL。但是,隨著大數(shù)據(jù)處理呈現(xiàn)實時化趨勢,我們也會有實時數(shù)倉的需求,要求在分鐘級或者秒級就能夠?qū)?shù)據(jù)進行更新,從而進行及時的查詢,能夠看到實時的指標,然后做更實時的判斷和分析。 4>走進peFlk在以上場景底下,F(xiàn)k能夠最大限度地滿足實時化的需求。Finkonnctor,支持多種Sk,囊括了所有主流的存儲系統(tǒng)。另外它也有一些非常通用的內(nèi)置聚ETLETL四、acelink進peFlk<5 FlikFlink的核心概念主要有四個:EventStreams、State、(Event)Time和Snapshots。EvetSrems即事件流,事件流可以是實時的也可以是歷史的。Flink是基于流的,但它不止能處理流,也能處理批,而流和批的輸入都是事件流,差別在于實時與批量。StateFlink擅長處理有狀態(tài)的計算。通常的復雜業(yè)務邏輯都是有狀態(tài)的,它不僅要處理單一的事件,而且需要記錄一系列歷史的信息,然后進行計算或者判斷。1.(Eve)Time最主要處理的問題是數(shù)據(jù)亂序的時候,一致性如何保證。1.4Spsos實現(xiàn)了數(shù)據(jù)的快照、故障的恢復,保證數(shù)據(jù)一致性和作業(yè)的升級遷移等。Flik 6>走進peFlkFk的作業(yè)描述和邏輯拓撲。如上方所示,代碼是一個簡單的Flink作業(yè)描述。它首先定義了一個KafkaSource,說明數(shù)據(jù)源是來自于Kafka消息隊列,然后解析Kafka里每一條數(shù)據(jù)。解析完成后,下發(fā)的數(shù)據(jù)我們會按照事件的ID進行KeyBy,每個分組每10秒鐘進行一次窗口的聚合。聚合處理完之后,消息會寫到自定義的Sink。以上是一個簡單的作業(yè)描述,這個作業(yè)描述會映射到一個直觀的邏輯拓撲??梢钥吹竭壿嬐負淅锩嬗?個稱為算子或者是運算的單元,分別是Source、MapKeyBy/Window/ApplySink,我們把邏輯拓撲稱為Streamingfo。Flik進peFnk<7 邏輯拓撲對應物理拓撲,它的每一個算子都可以并發(fā)進行處理,進行負載均衡與處理加速等。大數(shù)據(jù)的處理基本上都是分布式的,每一個算子都可以有不同的并發(fā)度。有KeyBy關鍵字的時候,會按照key來對數(shù)據(jù)進行分組,所以在KeyBy前面的算子處理完之后,數(shù)據(jù)會進行一個Shuffle并發(fā)送到下一個算子里面。上圖代表了示例對應的物理拓撲。FlikFk里面的狀態(tài)管理和快照。 8>走進peFlk在進行Window的聚合邏輯時,每隔10秒會對數(shù)據(jù)進行聚合函數(shù)的處理。這10秒內(nèi)的數(shù)據(jù)需要先存儲起來,待時間窗口觸發(fā)時進行處理。這些狀態(tài)數(shù)據(jù)會以嵌入式存儲的形式存儲在本地。這里的嵌入式存儲既可以是進程的內(nèi)存里,也可以是類似RocksDB的持久化KV存儲,兩者最主要的差別是處理速度與容量。此外,這些有狀態(tài)算子的每一個并發(fā)都會有一個本地的存儲,因此它的狀態(tài)數(shù)據(jù)本身可以跟隨算子的并發(fā)度進行動態(tài)的擴縮容,從而可以通過增加并發(fā)處理很大的數(shù)據(jù)量。進peFnk<9 另一方面,作業(yè)在很多情況下有可能會失敗。失敗之后重新去運行時,我們?nèi)绾伪WC數(shù)據(jù)的一致性?Finkhndy-Lmort算法,會把分布式的每一個節(jié)點的狀態(tài)保存到分布式文件系統(tǒng)里面作為Checkpoint(檢查點),過程大致如下。首先,從數(shù)據(jù)源端開始注入potrrr,它是一種比較特殊的消息。然后它會跟普通的事件一樣隨著數(shù)據(jù)流去流動,當Barrier到達算子之后,這個算子會把它當前的本地狀態(tài)進行快照保存,當Barrier流動到Sink,所有的狀態(tài)都保存完整了之后,它就形成一個全局的快照。 0>走進peFlk這樣當作業(yè)失敗之后,就可以通過遠程文件系統(tǒng)里面保存的Checkpoint來進行回滾:先把Source回滾到Checkpoint記錄的offset,然后把有狀態(tài)節(jié)點當時的狀態(tài)回滾到對應的時間點,進行重新計算。這樣既可以不用從頭開始計算,又能保證數(shù)據(jù)語義的一致性。Flik進peFnk<1 FkEvtTm。在Flink里有三種不同的時間,EventTime指事件發(fā)生的時間,IngestionTime指事件到達Flink數(shù)據(jù)源的時間,或者說進入到Flink處理框架的時間,ProssgTme在現(xiàn)實世界中,這個事件從發(fā)生到寫入到系統(tǒng)里面,期間的間隔可能比較久。例如在地鐵里面信號較弱時,如果我們在微博進行轉(zhuǎn)發(fā)、評論、點贊等操作,由于網(wǎng)絡的原因,這些操作可能要等我們出了地鐵后才能完成,因此可能有些先發(fā)生的事件會后到達系統(tǒng)。而EventTime能夠更真實地反映事件發(fā)生的時間點,因此在很多場景下,我們用EvtTme但是在這種情況底下,由于存在的延遲,所以在窗口需要花費較長的時間等待它的到來,端到端的延遲可能較大。 2>走進peFlk我們還需要處理亂序的問題,如果用ProcessingTime當做事件時間的話,處理較快,延遲較低,但是無法反映真實事件發(fā)生的情況。因此在真實的開發(fā)應用時,需要根據(jù)應用的特點做相應的取舍。FlikPIFlink可分成4個層次的API,最底層的API是可以自定義的ProcessFunction,對一些最基本的元素,如時間、狀態(tài)等,進行細節(jié)的處理,實現(xiàn)自己的邏輯。再往上一層是DataStreamAPI,它可以做流和批的處理,另外一方面它是邏輯的表達,有很多Fk內(nèi)置的函數(shù),方便用戶編寫程序。PITePISrmSL,這是一個非常上層的表達形式,非常簡潔,我們接下來分別舉例說明。進peFnk<3 ProessFucin可以看到,在processElement里邊,能夠?qū)@個事件、狀態(tài)進行自定義邏輯的處理。另外,我們可以注冊一個timer,并且自定義當timer被觸發(fā)或時間到達的時候,到底要進行哪些處理,是一個非常精細的底層控制。DStrmPIDataStreamAPI是作業(yè)的描述,可以看到它有很多內(nèi)置的函數(shù),如Map、keyBy、timeWindow、sum等。這也有些我們剛才自定義的ProcessFunction,如yggrgoFco。TbePI&SremSQLTePISrmSLTalePISrmSL5FlikFk運行時的架構(gòu)主要有三個角色。第一個是客戶端,客戶端會提交它的應用程序,如果它是一個SQL程序,還會進行SQL優(yōu)化器的優(yōu)化,然后生成對應的JobGraph??蛻舳藭裲bGraph提交到JobManager,可以認為這是整個作業(yè)的主控節(jié)點。 4>走進peFlkJobManager會拉起一系列的TaskManager作為工作節(jié)點,工作節(jié)點之間會按照作業(yè)拓撲進行串聯(lián),還有相應計算邏輯的處理,JobManager主要是進行一些控制流的處理。FlikFk能部署哪些環(huán)境。首先,它可以通過手動的方式作業(yè)提交到Y(jié)ARN,Mesos以及Standalone集群上。另外,它也可以通過鏡像的方式提交到K8s云原生的環(huán)境中。目前,F(xiàn)k在許多物理環(huán)境中均能進行部署。tramroesingwithpaheFlink作者:崔星燦heFkomerSrmPrssnghheF:并行處理和編程范式SremPI概覽及簡單應用Fk一、并行處理和編程范式眾所周知,對于計算密集型或數(shù)據(jù)密集型這樣需要計算量比較大的工作,并行計算或分而治之是解決這一類問題非常有效的手段。在這個手段中比較關鍵的部分是,如何對一個已有任務的劃分,或者說如何對計算資源進行合理分配。舉例說明,上學期間老師有時會找同學來協(xié)助批閱考試試卷。假如卷子里面一共有ABC三個題,那么同學可能會有如下分工協(xié)作方式。方式一:將所有試卷的三個題分別交給不同的人來批閱。這種方式,每個批閱的同學批自己負責的題目后就可以把試卷傳給下一個批閱同學,從而形成一種流水線的工作效果。但是這種流水線的協(xié)作方式會隨著同學數(shù)量的增加而難以繼續(xù)擴展。 6>SamongthpeFnk方式二:分工方式一的擴展,同一題目允許多個同學來共同批閱,比如A題目由兩個同學共同批閱,B題目由三個同學批閱,C題目只由一個同學批閱。這時候我們就需要考慮怎樣進一步的對計算任務做劃分。比如,可以把全部同學分成三組,第ABC。第一個組的同學可以再次在組內(nèi)進行分工,比如A組里第一個同學批一半的卷子,第二個同學批另一半卷子。他們分別批完了之后,再將自己手里的試卷傳遞給下一個組。像上述按照試卷內(nèi)題目進行劃分,以及講試卷本身進行劃分,就是所謂的計算的并行性和數(shù)據(jù)并行性。我們可以用上面有向無環(huán)圖來表示這種并行性。在圖中,批閱A題目的同學,假設還承擔了一些額外任務,比如把試卷從老師的辦公室拿到批閱試卷的地點;負責C題的同學也有額外任務,就是等所有同學把試卷批完后,進行總分的統(tǒng)計和記錄上交的工作。據(jù)此,可以把圖中所有的節(jié)StamongthpeFlk<7 點劃分為三個類別。第一個類別是Source,它們負責獲取數(shù)據(jù)(拿試卷);第二類是數(shù)據(jù)處理節(jié)點,它們大多時候不需要和外部系統(tǒng)打交道;最后一個類別負責將整個計算邏輯寫到某個外部系統(tǒng)(統(tǒng)分并上交記錄)。這三類節(jié)點分別就是Source節(jié)點、Transformation節(jié)點和Sink節(jié)點。DAG圖中,節(jié)點表示計算,節(jié)點之間的連線代表計算之間的依賴。(一)關于編程的一些內(nèi)容1~102如果用編程來解決有兩個角度:第一種是采取命令式編程方式,一步一步的相當于告訴機器應該怎樣生成一些數(shù)據(jù)結(jié)構(gòu),怎樣的用這些數(shù)據(jù)結(jié)構(gòu)去存儲一些臨時的中間結(jié)果,怎樣把這些中間結(jié)果再轉(zhuǎn)換成為最終的結(jié)果,相當于一步一步告訴機器如何 8>SamongthpeFnk完成怎樣的任務,而不需要像命令式那樣詳細傳遞。例如我們可以把原有的數(shù)據(jù)集轉(zhuǎn)化成一個Stream,然后再把Stream轉(zhuǎn)化成一個Int類型的Stream,在此過程中,把每一個數(shù)字都乘2,最后再調(diào)用Sum方法,就可以獲得所有數(shù)字的和。聲明式編程語言的代碼更簡潔,而簡潔的開發(fā)方式,正是計算引擎追求的效果。所以在Flink里所有與任務編寫相關的API,都是偏向聲明式的。二、DatatramPISrmPI之前,我們先來看一下FkPI在舊版本的FlinkPI層次遵循上圖左側(cè)這樣四層的關系。最上層表示我們可以用比較高級的API,或者說聲明程度更高的TablePI以及SQL的方式來編寫邏輯。所有SQL和TablePI編寫的內(nèi)容都會被Flink內(nèi)部翻譯和優(yōu)化成一個用DataStreamAPI實現(xiàn)的程序。再往下一層,DataStreamAPI的程序會StamongthpeFlk<9 被表示成為一系列TransformationTransformation會被翻譯成JobGraph(即上文介紹的DAG)。而在較新版本的Flink里發(fā)生了一些改變,主要的改變體現(xiàn)在TableAPISLatStreamPITrnsformationDataStreamAPITablePIDataStreamAPI2再求和的需求。Flink 0>SamongthpeFnkFlink實現(xiàn)任何功能,一定要獲取一個相應的運行環(huán)境,也就是SrmExionEvromt;其次,在獲取環(huán)境后,可以調(diào)用環(huán)境的addSource方法,來為邏輯添加一個最初始數(shù)據(jù)源的輸入;設置完數(shù)據(jù)源后可以拿到數(shù)據(jù)源的引用,也就是DataSource最后,可以調(diào)用一系列的轉(zhuǎn)換方法來對DataSource中的數(shù)據(jù)進行轉(zhuǎn)化。這種轉(zhuǎn)化如圖所示,就是把每個數(shù)字都×2,隨后為了求和我們必須利用keyBy不能簡單的像單機程序那樣把它輸出,而是需要在整個邏輯里面加一個的SnknvironmentExecuteFlinkDataStreamAPI編寫程序和單機程序最大的不同就在于,它前幾步的過程都不會觸發(fā)數(shù)據(jù)的計算,而像在繪制一個DAG圖。等整個邏輯的DAG圖繪制完畢之后,就可以通過Execute方法,把整個的圖作為一個整體,提交到集群上去執(zhí)行。介紹到這里,就把FlinkDataStreamAPI和DAGikFkStamongthpeFlk<1 DttmI就像上文在示例代碼中展示的,每一個DataStream對象,在被調(diào)用相應方法的時候,都會產(chǎn)生一個新的轉(zhuǎn)換。相應的,底層會生成一個新的算子,這個算子會被添加到現(xiàn)有邏輯的DAG圖中。相當于添加一條連線來指向現(xiàn)有DAG圖的最后一個節(jié)點。所有的這些API在調(diào)動它的時候都會產(chǎn)生一個新的對象,然后可以在新的對象上去繼續(xù)調(diào)用它的轉(zhuǎn)換方法。就是像這種鏈式的方式,一步一步把這個DAG圖給畫出來。 2>SamongthpeFnk上述解釋涉及到了一些高階函數(shù)思想。每去調(diào)用DataStream上的一個轉(zhuǎn)換時,都需要給它傳遞的一個參數(shù)。換句話說,轉(zhuǎn)換決定了你想對這個數(shù)據(jù)進行怎樣的操作,而實際傳遞的包在算子里面的函數(shù)決定了轉(zhuǎn)換操作具體要怎樣完成。APIFlinkDataStreamAPI的功能,它們是ProcessFunction以及oProcessFunction。這兩個函數(shù)是作為最底層的處理邏輯提供給用戶使用的。上圖所有左側(cè)藍色涉及的轉(zhuǎn)換,理論上來講都可以ProssFtonoPossFcon去完成。(二)關于數(shù)據(jù)分區(qū)數(shù)據(jù)分區(qū)是指在傳統(tǒng)的批處理中對數(shù)據(jù)Shuffle的操作。如果把撲克牌想成數(shù)據(jù),傳統(tǒng)批處理里的Shuffle操作就相當于理牌的過程。一般情況下在抓牌過程StamongthpeFlk<3 中,我們都會把牌理順排列好,相同的數(shù)字還要放在一起。這樣做最大的好處是,出牌時可以一下子找到想出的牌。Shuffle是傳統(tǒng)的批處理的方式。因為流處理所有的數(shù)據(jù)都是動態(tài)來的,所以理牌的過程或者說處理數(shù)據(jù),進行分組或分區(qū)的過程,也是在線來完成的。例如上圖右側(cè)所示,上游有兩個算子A的處理實例,下游是三個算子B處理實例。這里展示的流處理等價于Shuffle的操作被稱為數(shù)據(jù)分區(qū)或數(shù)據(jù)路由。它用來表示A處理完數(shù)據(jù)后,要把結(jié)果發(fā)到下游B的哪個處理實例上。Flik圖XFlinkDataStream調(diào)用eyBy方法后,可以把整個數(shù)據(jù)按照一個Key值進行分區(qū)。但要嚴格來講,其實keyBy并不算是底層物理分區(qū)策略,而是一種轉(zhuǎn)換操作,因為從API角度來看,它會把DataStreamKeyedDataStream的類型,而這兩者所支持的操作也有所不同。 4>SamongthpeFnk所有這些分區(qū)策略里,稍微難理解的可能是Rescale。Rescale涉及到上下游數(shù)據(jù)本地性的問題,它和傳統(tǒng)的Rebalance,即Round-Pobin,輪流分配類似。區(qū)別在于RsePartitionCustom去自定義一個數(shù)據(jù)的分區(qū)。值得注意的是,它只是自定義的單播,即對每一個數(shù)據(jù)只能指定它一個下游所要發(fā)送的實例,而沒有辦法把它復制成多份發(fā)送到下游的多個實例中。Flik上文介紹過,圖X里有兩個關鍵的節(jié)點:A節(jié)點,需要去連接外部系統(tǒng),從外部系統(tǒng)把數(shù)據(jù)讀取到Flink的處理集群里;C節(jié)點,即Sink節(jié)點,它需要匯總處理完的結(jié)果,然后把這個結(jié)果寫入到某個外部系統(tǒng)里。這里的外部系統(tǒng)可以是一個文件系統(tǒng),也可以是一個數(shù)據(jù)庫等。StamongthpeFlk<5 FinkFnkSe的狀態(tài)的概念。在中間計算的結(jié)果實際上是可以通過Stte暴露給外部系統(tǒng),所以允許沒有專門的SinkFlinkSource,也就是說必須從某個地方把數(shù)據(jù)讀進來,才能進行后續(xù)的處理。關于SoreSk兩類連接器需要關注的點如下:對于Sourse而言,我們往往比較關心是否支持續(xù)監(jiān)測并接入數(shù)據(jù)更新,然后把相應的更新數(shù)據(jù)再給傳輸?shù)竭@個系統(tǒng)當中來。舉例來說,F(xiàn)link對于文件有相應的FlSysemSV文件。SV文件連接器在定義時,可以通過參數(shù)指定是否持續(xù)監(jiān)測某個目錄的文件變化,并接入更新后的文件。對于Sink來講,我們往往關心要寫出的外部系統(tǒng)是否支持更新已經(jīng)寫出的結(jié)果。比如要把數(shù)據(jù)寫到Kae-Oy,即 6>SamongthpeFnk不能修改已經(jīng)寫入系統(tǒng)里的記錄(社區(qū)正在利用KafkaCompaction實現(xiàn)UpsertSink);如果是寫入數(shù)據(jù)庫,那么通??梢灾С掷弥麈I對現(xiàn)有數(shù)據(jù)進行更新。以上兩個特性,決定了Flink里連接器是面向靜態(tài)數(shù)據(jù)還是面向動態(tài)的數(shù)據(jù)的關鍵點。Flink1.11Flink1.11Tabe、SQL、PI這個層面的連接器,比起ataStrem層面的連接器,會承擔更多的任務。比如是否支持一些謂詞或投影操作的下推等等。這些功能可以幫助提高數(shù)據(jù)處理的整體性能。三、likSrmPI,狀態(tài)和時間是必須掌握的要點。所有的計算都可以簡單地分為無狀態(tài)計算和有狀態(tài)計算。無狀態(tài)計算相對而言比較容易。假設這里有個加法算子,每進來一組數(shù)據(jù),都把它們?nèi)考悠饋?,然后把結(jié)果輸出去,有點純函數(shù)的味道。純函數(shù)指的是每一次計算結(jié)果只和輸入數(shù)據(jù)有關,之前的計算或者外部狀態(tài)對它不會產(chǎn)生任何影響。StamongthpeFlk<7 這里我們主要講一下Flink里邊的有狀態(tài)計算。用撿樹枝的小游戲來舉例。這個游戲在我看來做的非常好的一點是它自己記錄了非常多的狀態(tài),比如幾天沒上線,然后再去和里邊的NPC對話的時候,它就會告訴你已經(jīng)有好久沒有上線了。換句話說,它會把之前上線的時間作為一種狀態(tài)給記錄下來,在生成與NPC對話的時候,是會受到這個狀態(tài)的影響。實現(xiàn)這種有狀態(tài)的計算,要做的一點就是把之前的狀態(tài)記錄下來,然后再把這個狀態(tài)注入到新的一次計算中,具體實現(xiàn)方式也有下面兩種:第一種,把狀態(tài)數(shù)據(jù)進入算子之前就給提取出來,然后把這個狀態(tài)數(shù)據(jù)和輸入數(shù)據(jù)合并在一起,再把它們同時輸入到算子中,得到一個輸出。這種方式是被用在Spark的StructureStreaming里邊。其好處是是可以重用已有的無狀態(tài)算子。 8>SamongthpeFnkFlink計算引擎也應該像上面提到的游戲一樣變得越來越智能,可以自動學習數(shù)據(jù)中潛在的規(guī)律,然后來自適應地優(yōu)化計算邏輯,保持較高的處理性能。FlikFlink的狀態(tài)原語涉及如何通過代碼使用Flink的狀態(tài)。其基本思想是在編程的時候拋棄原生語言(例如Java或Scala)提供的數(shù)據(jù)容器,把它們更換為Flink里面的狀態(tài)原語。Flink原語。從大的角度看,所有狀態(tài)原語可以分為KeyedStateOperatorState兩類。OperatorState應用相對比較少,我們在這里不展開介紹。下面重點看KeyedState。StamongthpeFlk<9 KeyedState,即分區(qū)狀態(tài)。分區(qū)狀態(tài)的好處是可以把已有狀態(tài)按邏輯提供的分區(qū)分成不同的塊。塊內(nèi)的計算和狀態(tài)都是綁定在一起的,而不同的Key值之間的計算和狀態(tài)的讀寫都是隔離的。對于每個Key值,只需要管理好自己的計算邏輯和狀態(tài)就可以了,不需要去考慮其它Key值所對應的邏輯和狀態(tài)。KeyedState5比較常用的:St、sSte、Se不太常用的:RigSeggrgoSeKeyedState只能在RichFuction中使用,RichFuction與普通、傳統(tǒng)的Function相比,最大的不同就是它有自己的生命周期。KeyState的使用方法分為以下四個步驟:第一步,將SeRFnion第二步,在RcFuncion對應的oen方法中,為Sae進行一個初始化的賦值操作。賦值操作要有兩步:先創(chuàng)建一個StateDescriptor,在創(chuàng)建中需要給SeRFtongRtmotx().gSe(…SDsrporSe。(提醒:如果此流式應用是第一次運行,那么獲得的State會是空內(nèi)容的;如果State是從某個中間段重啟的,它會根據(jù)配置和之前保存的數(shù)據(jù)的基礎上進行恢復。) 0>SamongthpeFnk第三步,得到State對象后,就可以在RichFunction里,對對應的State進行讀寫。如果是ValueState,可以調(diào)用它的Value方法來獲取對應值。Flink框架會控制好所有狀態(tài)的并發(fā)訪問,并進行限制,所以用戶不需要考慮并發(fā)的問題。Flik時間也是FlinkStateFlink引擎里邊提供的時間有兩類:第一類是ProcessingTimeEventTime。ProcessingTime表示的是真實世界的時間,EventTime是數(shù)據(jù)當中包含的時間。數(shù)據(jù)在生成的過程當中會攜帶時間戳之類的字段,因為很多時候需要將數(shù)據(jù)里攜帶的時間戳作為參考,然后對數(shù)據(jù)進行分時間的處理。StamongthpeFlk<1 ProcessingTime處理起來相對簡單,因為它不需要考慮亂序等問題;而EventTime處理起來相對復雜。由于ProcessingTime在使用時是直接調(diào)取系統(tǒng)的時間,考慮到多線程或分布式系統(tǒng)的不確定性,所以它每次運行的結(jié)果可能是不確定的;相反,因為EventTime時間戳是被寫入每一條數(shù)據(jù)里的,所以在重放某個數(shù)據(jù)進行多次處理的時候,攜帶的這些時間戳不會改變,如果處理邏輯沒有改變的話,最后的結(jié)果也是比較確定的。ProssgTmeEvtTme 2>SamongthpeFnk以上圖的數(shù)據(jù)為例,按照1~7的時間來排列的。對于機器時間而言,每個機器ProcssigTme獲得的時間是完美的按照時間從小到大排序的數(shù)據(jù)。對于EventTime而言,由于延遲或分布式的一些原因,數(shù)據(jù)到來的順序可能和它們真實產(chǎn)生的順序有一定的出入,數(shù)據(jù)可能存在著一定程度的亂序。這時就要充分利用數(shù)據(jù)里邊攜帶的時間戳,對數(shù)據(jù)進行一個粗粒度的劃分。例如可以把數(shù)據(jù)分為三組,第一組里最小的時間是1,第二組最小的時間是4,第三組最小的時間是7。這樣劃分之后,數(shù)據(jù)在組和組之間就是按從小到大的順序排列好的。怎樣充分地把一定程度的亂序化解掉,讓整個的系統(tǒng)看上去數(shù)據(jù)進來基本上是有atermrk的mta數(shù)據(jù)。在上圖的例子中,前三個數(shù)據(jù)到來之后,假設再沒有小于等于3的數(shù)據(jù)進來了,這時就可以termrk3termrk3時就知道,以后都不會有小于或等于3的數(shù)據(jù)過來了,這時它就可以放心大膽地進行自己的一些處理邏輯??偨Y(jié)一下,ProcessingTime在使用時,是一個嚴格遞增的;而EventTime會存在一定的亂序,需要通過rmrk從API的角度來看,怎樣去分配Timestamp或生成Watermark也比較容易,有兩種方式:第一種,在SourceFunctioncollectWithTimestamp方法,把包含時間戳的數(shù)據(jù)提取出來;還可以在SourceFunction中使用mWrmrkrmr,然后插入到數(shù)據(jù)流中。StamongthpeFlk<3 第二種,如果不在SorceFunction中可以調(diào)用ateStream.assignTimestampsWrmrsermrk第一類是定期生成,相當在環(huán)境里通過配置一個值,比如每隔多長時間(指真實時間)系統(tǒng)會自動調(diào)用Watermar生成策略。ssignWitPunudermrsrmrk的分配。提醒:FlinkAssigner,即WatermarkAssigner。比如針對trmark。關于Timestamp分配和Watermark生成接口,在后續(xù)的版本可能會有一定的改Fk里面已經(jīng)統(tǒng)一了上述兩類生成器。I 4>SamongthpeFnkFlink在編寫邏輯時會用到的與時間相關的API,下圖總結(jié)了EventTime和ProssgTmePI。在應用邏輯里通過接口支持可以完成三件事:第一,獲取記錄的時間。EventTime可以調(diào)context.getTimestamp,或在SQL算子內(nèi)從數(shù)據(jù)字段中把對應的時間給提取出來。ProcessingTime可以直接調(diào)currentProcessingTime完成調(diào)取,它的內(nèi)部是直接調(diào)用了獲取系統(tǒng)時間的靜態(tài)方法來返回的值。termrkEvntTmetermrk的概念,而ProcessingTime里是沒有的。但在ProcessingTime中非要把某個東西當成atermrk,其實就是數(shù)據(jù)時間本身。也就是說第一次調(diào)用imerServic.currentProcessingTime方法之后獲取的值。這個值既是當前記錄的這個時間,也是當前的trmarkStamongthpeFlk<5 第三,注冊定時器。定時器的作用是清理。比如需要對一個cache在未來某個時間進行清理工作。既然清理工作應該發(fā)生在未來的某個時間點,那么可以調(diào)用timerServicerEventTimeTimer或ProcessingTimeTimer方法注冊定時器,再在整個方法里添加一個對定時器回調(diào)的處理邏輯。當對應的EventTime或者ProcessingTime的時間超過了定時器設置時間,它就會調(diào)用方法自己編寫定時器的回調(diào)邏輯。以上就是關于StreamProcesswithpacheFlink的介紹,下一篇內(nèi)容將著重介FkRmerttre。FlinkRntierciteture作者:朱翥(長耕)heFkPC,本文由pceFnkPCommiterFikRnimeRme總覽作業(yè)的控制中心Jomsr任務的運行容器—TaskExacutor資源的管理中心Rsorgr一、ntie眾所周知FinkJob的形式提交給FlinkFlinkRuntimeFlink以在Flink集群上跑,F(xiàn)linkRuntime必須支持所有類型的作業(yè),以及不同條件下運行的作業(yè)。(一)作業(yè)的表達要執(zhí)行作業(yè),首先要理解作業(yè)是如何在FkFlkRnmettue<7 用戶通過API的方式寫一個作業(yè),例如上圖左側(cè)StreamWordInput的示例,它可以不斷的輸出一個個單詞;下面的Map操作負責把單詞映射成一個二元組;再接一個keyBy,使相同的word的二元組都被分配在一起,然后sum將它們計數(shù),最后打印出來。左側(cè)的作業(yè)對應著右邊的邏輯拓撲(StreamGraph)。這個拓撲中有4個節(jié)點,分別是source、map、sum和print。這些是數(shù)據(jù)處理邏輯,又稱之為算子;節(jié)點之間的線條對應著數(shù)據(jù)的分發(fā)方式,影響著數(shù)據(jù)以什么樣的方式分發(fā)給下游。舉例來說,mapsumkeyBymapkey有了StreamGraphFlinkRuntime會進一步的把它翻譯成JobGraph。JobGraph和StreamGraph的區(qū)別是,JobGraphchainOperatorchain。hain條件是需要兩個算子的并發(fā)度是一樣的,并且它們的數(shù)據(jù)交erorn,又稱為Jrtx。 8>FnkRnmettueOperatorchain的意義是能夠減少一些不必要的數(shù)據(jù)交換,這樣chain的operator都是在同一個地方進行執(zhí)行。在作業(yè)實際執(zhí)行過程中,邏輯圖會進一步被翻譯成執(zhí)行圖—ExecutionGraph。執(zhí)行圖是邏輯圖并發(fā)層面的視圖,如上圖所示,下面的執(zhí)行圖就是上面邏輯圖所有算子并發(fā)都為2的表達。為什么上圖中的map和sum不能嵌起來?因為它們的數(shù)據(jù)是涉及到多個下游算子的,并非一對一的數(shù)據(jù)交換方式。邏輯圖JobVertex中的一個節(jié)點,會對應著并發(fā)數(shù)個執(zhí)行節(jié)點ExecutionVertex,節(jié)點對應著一個個任務,這些任務最后會作為實體部署到Worker節(jié)點上,并執(zhí)行實際的數(shù)據(jù)處理業(yè)務邏輯。(二)分布式架構(gòu)Flink作為分布式數(shù)據(jù)處理框架,它有一套分布式的架構(gòu),主要分為三塊:n、srorrFlkRnmettue<9 Master是Flink集群的主控中心,它可以有一個到多個JobMaster,每個JobMaster對應一個作業(yè),而這些JobMaster由一個叫Dispatcher的控件統(tǒng)一管理。Master節(jié)點中還有一個ResourceManager進行資源管理。ResourceManager管理著所有Worker節(jié)點,它同時服務于所有作業(yè)。此外Master節(jié)點中還有一個RestServer,它會用于響應各種Client端來的Rest請求,Client端包括Web端以及命令行的客戶端,它可以發(fā)起的請求包括提交作業(yè)、查詢作業(yè)的狀態(tài)和停止作業(yè)等等。作業(yè)會通過執(zhí)行圖被劃分成一個個的任務,這些任務最后都會在Worker節(jié)點中進行執(zhí)行。Worker就是TaskExecutor,它們是任務執(zhí)行的容器。作業(yè)執(zhí)行的核心組件有三個,分別是JobMasterTaskExecutor和ResourceManager:JobMaster用于管理作業(yè);TaskExecutor用于執(zhí)行各個任務;RsorgrJosr二、JobatrJobMaster的主要職責包括作業(yè)生命周期的管理、任務的調(diào)度、出錯恢復、狀態(tài)查詢和分布式狀態(tài)快照。eckpont和Sveponthckpont主要是為出Sot主要是用于作業(yè)的維護,包括升級和遷移等等。pooortor 0>FnkRnmettueJobaster中的核心組件是Sheduer,無論是作業(yè)的生命周期管理、作業(yè)的狀Se來負責的。(一)作業(yè)的生命周期管理作業(yè)的生命周期的狀態(tài),作業(yè)所有可能的狀態(tài)遷移都在下圖展示出來了。FlkRnmettue<1 正常流程下作業(yè)會有三種狀態(tài),分別是Created、Running和Finished。一個作業(yè)開始是處于Created的狀態(tài),當這個作業(yè)被開始調(diào)度就開始進入Running狀態(tài)并開始調(diào)度任務,等到所有的任務都成功結(jié)束了,這個作業(yè)就走到Finished的狀態(tài),并匯報最終結(jié)果,然后退出。然而,一個作業(yè)在執(zhí)行過程中可能會遇到一些問題,因此作業(yè)也會有異常處理的狀態(tài)。作業(yè)執(zhí)行過程中如果出現(xiàn)作業(yè)級別錯誤,整個作業(yè)會進到Failing狀態(tài),然后Cancel所有任務。等到所有任務都進入最終狀態(tài)后,包括FailedCanceled、Finished,再去check出錯的異常。如果異常是不可恢復的,那么整個作業(yè)會走到Failed狀態(tài)并退出。如果異常是可恢復的,那么會走到Restarting狀態(tài),來嘗試重啟。如果重啟的次數(shù)沒有超過上限,作業(yè)會從Created狀態(tài)重新進行調(diào)度;如果達到上限,作業(yè)會走到Failed(Flink1.10版本中,當發(fā)生錯誤時,如果可以恢復,作業(yè)不會進入Failing狀態(tài)而會直接進入RsrngRig如果作業(yè)無法恢復,則作業(yè)會經(jīng)由FgFd)nclngceedncl作業(yè)的時候走到。當用戶手動的在WebUIFlinkcommandFlink狀態(tài)轉(zhuǎn)到ancel里,然后ancel所有任務,等所有任務都進入最終狀態(tài)后,整個ed狀態(tài)并退出。Suspended狀態(tài)只會在配置了highavailability,并且當JobMaster丟掉leadership才會走到。這個狀態(tài)只意味著這個JobMaster出現(xiàn)問題終止了。一般來說等到Jsrerspsdysr 2>FnkRnmettuersipersp的節(jié)點上重新啟動起來。(二)任務調(diào)度任務調(diào)度是JobMaster的核心職責之一。要調(diào)度任務,一個首要的問題就是決定什么時候去調(diào)度任務。任務調(diào)度時機是由調(diào)度策略(SchedulingStrategy)來控制的。這個策略是一個事件驅(qū)動的組件,它監(jiān)聽的事件包括:作業(yè)開始調(diào)度、任務的狀態(tài)發(fā)生變化、任務產(chǎn)出的數(shù)據(jù)變成可消費以及失敗的任務需要重啟,通過監(jiān)聽這些事件,它能夠比較靈活地來決定任務啟動的時機。目前我們有多種不同的調(diào)度策略,分別是Eager和Lazyfromsources。EagerSchedulingStrategy主要是服務于流式作業(yè),它的策略是在作業(yè)開始調(diào)度時,直接啟動所有的任務,這樣做的好處是可以降低調(diào)度時間。Lazyfromsources主要服務于批處理作業(yè)。它的策略是作業(yè)一開始只調(diào)度Source節(jié)點,等到有任意節(jié)點的輸入數(shù)據(jù)可以被消費后,它才會被調(diào)起來。如下圖所示,sourceFlkRnmettue<3 數(shù)據(jù)開始產(chǎn)出后,agg節(jié)點才能被調(diào)起來,agg節(jié)點結(jié)束后,sink節(jié)點才能被調(diào)起來。為什么Batch作業(yè)和Streaming作業(yè)會有不同的調(diào)度策略呢?是因為Batch作業(yè)里邊存在lockingshuffle數(shù)據(jù)交換模式。在這種模式下,需要等上游完全產(chǎn)出所有數(shù)據(jù)后,下游才能去消費這部分數(shù)據(jù)集,如果預先把下游調(diào)起來的話,它只會在那空轉(zhuǎn)浪費資源。相比Eager策略而言,對于批處理作業(yè)它能夠節(jié)省一定量的資源。Piplindregionsed調(diào)度策略,這個策略比較類似于Lazyfromsource策略,差異在于前者是以Pipelinedregion為粒度調(diào)度任務的。Pipelinedregion是以pipelinedPipelined邊意味著上下游節(jié)點會流式的進行數(shù)據(jù)交換,即上游邊寫,下游就邊讀邊消費。Pipelinedregion調(diào)度的好處是可以一定程度上繼承了Eager調(diào)度好處,能夠節(jié)省調(diào)度花費的 4>FnkRnmettue時間,且讓上下游任務并行起來。同時也保留了azyfromsources避免不必要資源的浪費。通過把一部分Task整體調(diào)度,就能知道這部分需要同時運行的作業(yè)所需的資源量是多少,能夠以此進行一些更深度的優(yōu)化。(Flink1.11Pipelinedregionstrategy成為默認調(diào)度策略,同時服務于流和批作業(yè)。)(三)任務調(diào)度的過程任務具有很多種不同狀態(tài),最初任務處在Created狀態(tài)。當調(diào)度策略認為這個任務可以開始被調(diào)的時候,它會轉(zhuǎn)到Scheduled狀態(tài),并開始申請資源,即Slot。申請到Slot之后,它就轉(zhuǎn)到Deploying狀態(tài)來生成Task的描述,并部署到worker節(jié)點上,再之后Task就會在worker節(jié)點上啟動起來。成功啟動后,它會在worker節(jié)點上轉(zhuǎn)到running狀態(tài)并通知JobMaster,然后在JobMaster端把任務的狀態(tài)轉(zhuǎn)到running。FlkRnmettue<5 對于無限流的作業(yè)來說,轉(zhuǎn)到running狀態(tài)就是最終狀態(tài)了;對于有限流的作業(yè),一旦所有數(shù)據(jù)處理完了,任務還會轉(zhuǎn)到finished狀態(tài),標志任務完成。當有異常發(fā)生時,任務也會轉(zhuǎn)到Filedanceled狀態(tài)。(四)出錯恢復當有任務出現(xiàn)錯誤時,JobMaster的策略或基本思路是,通過重啟出錯失敗的任務以及可能受到影響的任務,來恢復作業(yè)的數(shù)據(jù)處理。這包含三個步驟:第一步,停止相關任務,包括出錯失敗任務和可能受其影響任務,失敗任務可能已經(jīng)是FAILED的,然后其它受影響任務會被Cancel最終進到ed狀態(tài);Created狀態(tài);第三步,通知調(diào)度策略重新調(diào)度這些任務。 6>FnkRnmettue上文提及了可能受到影響的任務,那么什么樣的任務可能受影響呢?這是由出錯恢復策略(FailoverStrategy)來決定的。FlinkFailoverStrategy是RestartPipelinedRegionFailoverStrategy。采用了這個策略后,如果一個Task失敗了就會重啟它所在的region。這其實跟Ppied數(shù)據(jù)交換有關系。在Ppied數(shù)據(jù)交換的節(jié)點之間,如果任意一個節(jié)點失敗了,其相關聯(lián)的其它節(jié)點也會跟著失敗。這是為了防止出現(xiàn)數(shù)據(jù)的不一致。因此為了避免單個Task導致多次Failover,一般的操作是在收到第一個Tskedcl掉,再一起重啟。RestartPipelinedRegion策略除了重啟失敗任務所在的Region外,還會重啟它的下游Region。原因是任務的產(chǎn)出很多時候是非確定性的,比如說一個record,分發(fā)到下游的第一個并發(fā),重跑一次;分發(fā)到下游的第二個并發(fā)時,一旦這rgon中,就可能會導致rcord了避免這種情況,采用PipelinedRegionFailoverStrategy會重啟失敗任務所在的RgonRgo。另外,還有一個RestartAllFailoverStrategy策略,它會在任意Taskfail的時候,重啟作業(yè)中的所有任務。一般情況,這個策略并不被經(jīng)常用到,但是在一些特殊情況下,比如當任務失敗,用戶不希望局部運行而是希望所有任務都結(jié)束并整體進行恢復,可以用這個策略。三、TaskExecutor:任務的運行器FlkRnmettue<7 TaskExecutor是任務的運行器,為了運行任務它具有各種各樣的資源。如下圖所示,這里主要介紹memory的資源。所有內(nèi)存資源都是可以單獨配置的。Tsagr也對它們的配置進行了分層的管理,最外層是ProcessMemory,對應的是整個TaskExecutorJVM份內(nèi)存又包含了JVM自身占有的內(nèi)存以及FlinkFlink任務占用內(nèi)存包括了TaskHeapMemory,即任務的Java對象占有的內(nèi)存;Tskf-epmory一般用于aive的第三方庫;torkmory是用來創(chuàng)建NetworkBuffer用來服務于任務的輸入和輸出;ManagedMemory則是受管控的Off-HeapMemory,它會被一些組件用到,比如算子和StateBackend。這些Task資源會被它分成一個一個的Slot,Slot是任務運行的邏輯容器。當前,Slot大小是直接把整個TsExor的資源,按照Sot 8>FlkRnettue一個Slot里可以運行一個到多個任務,但是有一定約束,即同一個共享組中的SlotPipelinedRegion中的任務都是在一個共享組中,流式作業(yè)的所有任務也都是在一個共享組中。不同類型指的是它們需要屬于不同的JobVertex。如上圖右側(cè)示例,這是一個source、map、sink的作業(yè)??梢钥吹讲渴鸷螅腥齻€Slot中都有三個任務,分別是source、map、sum各一份。而有一個slot中只有兩個任務,就是因為source只有三個并發(fā),沒有更多并發(fā)可以部署進來。進行SlotSharing第一個好處是,可以降低數(shù)據(jù)交換的開銷。像map、sink之間是一對一的數(shù)據(jù)交換,實際上有物理數(shù)據(jù)交換的這些節(jié)點都被共享在了一塊,這樣可以使得它們的數(shù)據(jù)交換在內(nèi)存中進行,比在網(wǎng)絡中進行的開銷更低。第二個好處是,方便用戶配置資源。通過SlotSharing,用戶只需要配置n個Sotsmn是最大算子的并發(fā)度。FlkRnmettue<9 第三個好處是,在各個算子并發(fā)度差異不大的情況下,提高負載均衡。這是因為每個Slot里邊會有各種不同類型的算子各一份,這就避免某些負載重的算子全擠在同一個TaskExecutor中。(一)任務的執(zhí)行模型OperatorChainperatorChain都有自己的輸入和輸出,輸入是InputGateesultPartition。這些任務總體會在一個獨占的線程中執(zhí)行,任務從nputGate中讀peraorhin,peraorhin進行業(yè)務邏輯的處理,最后會將RsPrton中。Sourcetsk,它不從nputGate中讀取數(shù)據(jù),而直接通過SourceFunction方式來產(chǎn)出數(shù)據(jù)。上游的ResultPartition和下游的InputGateFlinkShuffleService進行數(shù)據(jù)交換。ShuffleService是一個插件,目前FlinkNettyShuffleService,下游的InputGateNetty來從上游的RsPrion中獲取數(shù)據(jù)。 0>FnkRnmettueResultPartition是由一個個的SubPartition組成的,每個SubPartition都對應著一個下游消費者并發(fā)。nputGate也是由一個個的InputChannel組成的,每個unel四、oceManaerResourceManager是Flink的資源管理中心。在前面我們有提到過TaskExecutor包含了各種各樣的資源。而ResourceManager,就管理著這些TaskExecutor。新啟動的TaskExecutor,需要向ResourceManager進行注冊,之后它里邊的資源才能服務于作業(yè)的請求。FlkRnmettue<1 ResourceManager里邊有個關鍵組件叫做SlotManager,它管理著Slot的狀態(tài)。這些Slot狀態(tài)是通過TaskExecutor到ResourceManager之間的心跳跳來進行更新的,在心跳信息中包含了TaskExecutor中的所有Slot的狀態(tài)。有了當前所有的Slot狀態(tài)之后,ResourceManager就可以服務于作業(yè)的資源申請。當JobMster調(diào)度一個任務的時候,會向ResourceManager發(fā)起Sot請求。收到請Rsoureaagr會轉(zhuǎn)交給Sonagr,Soaagr會去檢查它里邊的可SlotTaskExecutor發(fā)起Slot申請。如果請求成功,TaskExecutor會主動的向Josror這個S。之所以要這么繞一圈,是為了避免分布式帶來的不一致的問題。像剛才我們有提到,SlotManager中的Slot狀態(tài)是通過心跳來進行更新的,所以存在一定的延遲。此外在整個Slot申請過程中,Slot狀態(tài)也是可能發(fā)生變化的。所以最終我們需要以Sotor,以及它的K 2>FnkRnmettueResourceManager有多種不同的實現(xiàn),Standalone模式下采用的ResourceManagerStandaloneResourceManager,需要用戶手動拉起orr除此之外,還有一些會去自動申請資源的ResourceManager包括YarnResourceManager,esosResourceManager和KubernetesResourceManager。采用這些ResourceManager后,在不能滿足的情況下,ResourceManager會在Sotorr拿YarnResourceManager舉個例子,JobMaster去為某個任務請求一個Slot。YarnResourceManager將這個請求交給SlotManager,SlotManager發(fā)覺沒有Slot符合申請的話會告知YarnResourceManager,YarnResourceManager就會向真正的外部的YarnResourceManager去請求一個containerontier之后,它會啟動一個TsExector,當TsExctor起來之后,它會注ResourceManager中,并去告知它可用的Slot信息。SlotManager拿到FlkRnmettue<3 這個信息之后,就會嘗試去滿足當前pending的那些SlotRequest。如果能夠滿JobMaster就會去向TaskExecutor發(fā)起Slot請求,請求成功的話,TaskExecutor就會向JobMaster去offer這個Slot。這樣用戶就不需要在一開始去計算它作業(yè)的資源需求量是多少,而只需要保證單個Slot的大小,能夠滿足任務的執(zhí)行了。Falt-toleranceinFlnk作者:李鈺heFkPC,本文由ApacheFlinkPMC,阿里巴巴高級技術專家李鈺分享,主要介紹Fk有狀態(tài)的流計算全局一致性快照Fk的容錯機制Fk的狀態(tài)管理一、有狀態(tài)的流計算(一)流計算FltolaenFlk<5 流計算是指有一個數(shù)據(jù)源可以持續(xù)不斷地發(fā)送消息,同時有一個常駐程序運行代碼,從數(shù)據(jù)源拿到一個消息后會進行處理,然后把結(jié)果輸出到下游。(二)分布式流計算分布式流計算是指把輸入流以某種方式進行一個劃分,再使用多個分布式實例對流進行處理。(三)流計算中的狀態(tài) 6>FltolnenFlk計算可以分成有狀態(tài)和無狀態(tài)兩種,無狀態(tài)的計算只需要處理單一事件,有狀態(tài)的計算需要記錄并處理多個事件。舉個簡單的例子。例如一個事件由事件ID和事件值兩部分組成,如果處理邏輯是每拿到一個事件,都解析并輸出它的事件值,那么這就是一個無狀態(tài)的計算;相反,如果每拿到一個狀態(tài),解析它的值出來后,需要和前一個事件值進行比較,比前一個事件值大的時候才把它進行輸出,這就是一個有狀態(tài)的計算。流計算中的狀態(tài)有很多種。比如在去重的場景下,會記錄所有的主鍵;又或者在窗口計算里,已經(jīng)進入窗口還沒觸發(fā)的數(shù)據(jù),這也是流計算的狀態(tài);在機器學習/深度學習場景里,訓練的模型及參數(shù)數(shù)據(jù)都是流計算的狀態(tài)。二、全局一致性快照全局一致性快照是可以用來給分布式系統(tǒng)做備份和故障恢復的機制。(一)全局快照FltolaenFlk<7 什么是全局快照全局快照首先是一個分布式應用,它有多個進程分布在多個服務器上;其次,它在應用內(nèi)部有自己的處理邏輯和狀態(tài);第三,應用間是可以互相通信的;第四,在這種分布式的應用,有內(nèi)部狀態(tài),硬件可以通信的情況下,某一時刻的全局狀態(tài),就叫做全局的快照。為什么需要全局快照 8>FltolnenFlk以拿來恢復;第二,做死鎖檢測,進行快照后當前的程序繼續(xù)運行,然后可以對快照進行分析,看應用程序是不是存在死鎖狀態(tài),如果是就可以進行相應的處理。全局快照舉例下圖為分布式系統(tǒng)中全局快照的示例。P1P2C12C21。對于P112是它發(fā)送消息的管道,稱作outputhannel21是它接收消息的管道,稱作ptnl。除了管道,每個進程都有一個本地的狀態(tài)。比如說P1和P2每個進程的內(nèi)存里都有XYZ三個變量和相應的值。那么P1和P2進程的本地狀態(tài)和它們之間發(fā)送消息的管道狀態(tài),就是一個初始的全局狀態(tài),也可稱為全局快照。FltolaenFlk<9 P1P2P2x47,但是這個消P2。這個狀態(tài)也是一個全局快照。再接下來,P2收到了P1的消息,但是還沒有處理,這個狀態(tài)也是一個全局快照。 0>FltolnenFlk最后接到消息的P2把本地的X的值從4改為7,這也是一個全局快照。所以當有事件發(fā)生的時候,全局的狀態(tài)就會發(fā)生改變。事件包括進程發(fā)送消息、進程接收消息和進程修改自己的狀態(tài)。(二)全局一致性快照FltolaenFlk<1 假如說有兩個事件,a和b,在絕對時間下,如果a發(fā)生在b之前,且b被包含在快照當中,那么則a也被包含在快照當中。滿足這個條件的全局快照,就稱為全局一致性快照。全局一致性快照的實現(xiàn)方法時鐘同步并不能實現(xiàn)全局一致性快照;全局同步雖然可以實現(xiàn),但是它的缺點也非常明顯,它會讓所有應用程序都停下來,會影響全局的性能。異步全局一致性快照算法–Cn-pt異步全局一致性快照算法Chandy-Lamport可以在不影響應用程序運行的前提下,實現(xiàn)全局一致性快照。dy-mort 2>FltolnenFlk第一,不影響應用運行,也就是不影響收發(fā)消息,不需要停止應用程序;第二,每個進程都可以記錄本地狀態(tài);第三,可以分布式地對已記錄的狀態(tài)進行收集;第四,任意進程都可以發(fā)起快照。同時,Chandy-Lamport算法可以執(zhí)行還有一個前提條件:消息有序且不重復,并且消息可靠性可保障。hdy-morthandy-Lamport的算法流程主要分為三個部分:發(fā)起快照、分布式的執(zhí)行快照和終止快照。發(fā)起快照FltolaenFlk<3 任意進程都可以發(fā)起快照。如下圖所示,當由P1發(fā)起快照的時候,第一步需要記錄本地的狀態(tài),也就是對本地進行快照,然后立刻向它所有outputchannel發(fā)送一個marker消息,這中間是沒有時間間隙的。marker消息是一個特殊的消息,它不同于應用之間傳遞的消息。發(fā)出Marker消息后,P1就會開始記錄所有inputchannel的消息,也就是圖示C21管道的消息。分布式的執(zhí)行快照如下圖,先假定當Pi接收到來自ki的marker消息,也就是Pk發(fā)給Pi的mrr 4>FltolnenFlk第一種情況:這個是Pi收到的第一個來自其它管道的marker消息,它會先記錄一下本地的狀態(tài),再把C12管道記為空,也就是說后續(xù)再從P1發(fā)消息,就不包含otputhnnel發(fā)送mreriptnl的消息。FltolaenFlk<5 i消息不包含在實時快照里,但是實時消息還是會發(fā)生,所以第二種情況是,如果此前Pimarker消息,它會停止記錄ki消息,同時會將ii在本次快照中的最終狀態(tài)來保存。終止快照終止快照的條件有兩個:mrr第二,所有進程都從它的n-1個inputchannel里收到了marker當快照終止,快照收集器(CentralServer)就開始收集每一個部分的快照去形成全局一致性快照了。 6>FltolnenFlk示例展示在下圖的例子里,一些狀態(tài)是在內(nèi)部發(fā)生的,比如A,它跟其它進程沒有交互。內(nèi)部狀態(tài)就是P1AC11=[A->]。dy-mortFltolaenFlk<7 假設從p1來發(fā)起快照,它發(fā)起快照時,首先對本地的狀態(tài)進行快照,稱之為SoptnnlP2PmrerutnlP2P3圖例所示,縱軸是絕對時間,按照絕對時間來看,為什么P3和P2收到marker消息會有時間差呢?因為假如這是一個真實的物理環(huán)境里的分布式進程,不同節(jié)點之間的網(wǎng)絡狀況是不一樣的,這種情況會導致消息送達時間存在差異。P3先收到marker消息,且是它接收到的第一個marker消息。接收到消息后,它首先會對本地狀態(tài)進行快照,然后把C13close,與此同時開始向它所有的outputchannelmarker消息,最后它會把來自除了C13之外的所有utnl的消息開始進行記錄。 8>FltolnenFlk接收到P3發(fā)出的marker信息的是P1,但這不是它接收的第一個marker,它會把來自C31channel的管道立刻關閉,并且把當前的記錄消息做這個channel的快照,后續(xù)再接收到來自P3的消息,就不會更新在此次的快照狀態(tài)里了。FltolaenFlk<9 P2P3mrer消息后,它首先對本地狀態(tài)進行快照,然后把C32close,與此同時開始向它所有的outputchannel發(fā)送marker消息,最后它會把來自除了2ptnl的消息開始進行記錄。再來看P2接收到來自P1的消息,這不是P2接收到的第一個marker消息,所以它會把所有的ptnlnl的狀態(tài)。 0>FltolnenFlkP1P2會把所有的inputchannel關閉,并把記錄的消息作為狀態(tài)。那么這里面有兩個狀態(tài),一個是C11,即自己發(fā)給自己的消息;一個是C21,是P2H發(fā)給P1D的。最后一個時間點,P3接收到來自P2的消息,這也不是它收到的第一個消息,操作跟上面介紹的一樣。在這期間P3本地有一個事件J,它也會把J作為它的狀態(tài)。FltolaenFlk<1 當所有進程都記錄了本地狀態(tài),而且每一個進程的所有輸入管道都已經(jīng)關閉了,那么全局一致性快照就結(jié)束了,也就是對過去時間點的全局性的狀態(tài)記錄完成了。hdy-mort與FikFlinkFlink持故障恢復。Fink的異步全局一致性快照算法跟andy-amport算法的區(qū)別主要有以下幾點:第一,dy-mt支持強連通圖,而Fk支持弱連通圖;第二,F(xiàn)k采用的是裁剪的(Tor)y-mut異步快照算法;第三,F(xiàn)link的異步快照算法在DAG場景下不需要存儲Channelstate,從而極大減少快照的存儲空間。 2>FltolnenFlk三、lik容錯,就是恢復到出錯前的狀態(tài)。流計算容錯一致性保證有三種,分別是:Exyoe,tstoe,tmotoe。Exactlyonce,是指每條event會且只會對state產(chǎn)生一次影響,這里的“一Flink內(nèi)部只處理一次,不包括sourceskAtleastonce,是指每條event會對state產(chǎn)生最少一次影響,也就是存在重復處理的可能。Atmostonce,是指每條event會對state產(chǎn)生最多一次影響,就是狀態(tài)可能會在出錯時丟失。EctyoceFltolaenFlk<3 Exactlyonce的意思是,作業(yè)結(jié)果總是正確的,但是很可能產(chǎn)出多次;所以它的要求是需要有可重放的source。端到端的Exactlyonce,是指作業(yè)結(jié)果正確且只會被產(chǎn)出一次,它的要求除了有soresk和可以接收冪等的產(chǎn)出結(jié)果。Flik很多場景都會要求在Exactlyonce的語義,即處理且僅處理一次。如何確保語義呢?簡單場景的ExcyOne簡單場景的做法如下圖,方法就是,記錄本地狀態(tài)并且把sourceoffset,即Evtog 4>FltolnenFlk分布式場景的狀態(tài)容錯如果是分布式場景,我們需要在不中斷運算的前提下對多個擁有本地狀態(tài)的算子產(chǎn)生全局一致性快照。Flink分布式場景的作業(yè)拓撲比較特殊,它是有向無環(huán)并且是弱聯(lián)通圖,可以采用裁剪的Chandy-Lamport,也就是只記錄所有輸入的offset和各個算子狀態(tài),并依賴rewindablesource(可回溯的source,即可以通過offset讀取比較早一點時間點),從而不需要存儲channel的狀態(tài),這在存在聚合(aggregation)邏輯的情況下可以節(jié)省大量的存儲空間。FltolaenFlk<5 最后做恢復,恢復就是把數(shù)據(jù)源的位置重新設定,然后每一個算子都從檢查點恢復狀態(tài)。Flik 6>FltolnenFlk首先在源數(shù)據(jù)流里插入Checkpointbarrier,也就是上文提到的Chandy-Lamport算法里的markermessage,不同的Checkpointbarrier會把流自然地切分多個段,每個段都包含了pot的數(shù)據(jù);FlinkCoordinator,它不像Chandy-Lamport對任意一個進程Coordinator會把Checkpointbarrier注入到每個source里,然后啟動快照。當每個節(jié)點收到barrierFlinknlse,所以它只需存儲本地的狀態(tài)就好。FltolaenFlk<7 在做完了Checkpoint后,每個算子的每個并發(fā)都會向Coordinator發(fā)送一個確認消息,當所有任務的確認消息都被CheckpointCoordinator接收,快照就結(jié)束了。(四)流程演示見下圖示,假設CheckpointN被注入到source里,這時source會先把它正在處理分區(qū)的offset記錄下來。隨著時間的流逝,它會把Checkpointbarrier發(fā)送到兩個并發(fā)的下游,當barrier分別到達兩個并發(fā),這兩個并發(fā)會分別把它們本地的狀態(tài)都記錄在kotrrrssk,快照就完成了。 8>FltolnenFlk這是比較簡單的場景演示,每個算子只有單流的輸入,再來看下圖比較復雜的場景,算子有多流輸入的情況。當算子有多個輸入,需要把Barrier對齊。怎么把Barrier對齊呢?如下圖所示,在左側(cè)原本的狀態(tài)下,當其中一條barrier到達,另一條barrier命令上有的barrier還在管道中沒有到達,這時會在保證Exactlyonce的情況下,把先到達的流直接阻塞掉,然后等待另一條流的數(shù)據(jù)處理。等到另外一條流也到達了,會把之前的流unblock,同時把barrier發(fā)送到算子。FltolaenFlk<9 在這個過程中,阻塞掉其中一條流的作用是,會讓它產(chǎn)生反壓。Barrier對齊會導致反壓和暫停operator的數(shù)據(jù)處理。如果不在對齊過程中阻塞已收到barrier的數(shù)據(jù)管道,數(shù)據(jù)持續(xù)不斷流進來,那么屬于下個eckponteckpoit里,如果一旦發(fā)生故障恢復后,由于source會被rewind,部分數(shù)據(jù)會有重復處理,這就是at-least-onceat-least-once,那么可以選擇其他可以避免barrier對齊帶來的副作用。另外也可以通過異步快照來盡量減少任務停頓并支持多個kot同時進行。(五)快照觸發(fā) 0>FltolnenFlkseoy--rte的機制。假如對元數(shù)據(jù)信息做了快照之后數(shù)據(jù)處理恢復了,在上傳數(shù)據(jù)的過程中如何保證恢復的應用程序邏輯不會修改正在上傳的數(shù)據(jù)呢?實際上不同狀態(tài)存儲后端的處理是不一樣的,epakedopy-o-rte,而對于RosBakend來說LSMFltolaenFlk<1 四、likFlikese。在定義的狀態(tài)的時候,需要給出以下的幾個信息:D狀態(tài)數(shù)據(jù)類型本地狀態(tài)后端注冊狀態(tài)本地狀態(tài)后端讀寫狀態(tài)Fliksend,F(xiàn)k 2>FltolnenFlk第一種,JVMHeapJava儲所需的空間是磁盤上序列化壓縮后的數(shù)據(jù)大小的很多倍,所以占用的內(nèi)存空間很大;第二個弊端,雖然讀寫不用做序列化,但是在形成snapshot時需要做序列化,所以它的異步snapsho

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論