kafka 數(shù)據(jù)可靠性深度解讀_第1頁
免費預覽已結(jié)束,剩余36頁可下載查看

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1、kafka 數(shù)據(jù)可靠性深度解讀1 概述 kakfa起初是由linkedin公司開發(fā)的一個分布式的消息系統(tǒng),后成為apache的一部分,它用法scala編寫,以可水平擴展和高吞吐率而被廣泛用法。目前越來越多的開源分布式處理系統(tǒng)如cloudera、apache storm、spark等都支持與kafka集成。 假如想學習java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,mybatis,netty源碼分析的伴侶可以加我的java高級溝通:854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費共享給大家。 kafka憑借著自身的優(yōu)勢,越來越受到互聯(lián)網(wǎng)企業(yè)的青

2、睞,唯品會也采納kafka作為其內(nèi)部核心消息引擎之一。kafka作為一個商業(yè)級消息中間件,消息牢靠性的重要性可想而知。如何確保消息的精確傳輸?如何確保消息的精確存儲?如何確保消息的正確消費?這些都是需要考慮的問題。本文首先從kafka的架構(gòu)著手,先了解下kafka的基本原理,然后通過對kakfa的存儲機制、復制原理、同步原理、牢靠性和持久性保證等等一步步對其牢靠性舉行分析,最后通過benchmark來增加對kafka高牢靠性的認知。 2 kafka體系架構(gòu) 如上圖所示,一個典型的kafka體系架構(gòu)包括若干producer(可以是服務(wù)器日志,業(yè)務(wù)數(shù)據(jù),頁面前端產(chǎn)生的page view等等),若干

3、broker(kafka支持水平擴展,普通broker數(shù)量越多,集群吞吐率越高),若干consumer (group),以及一個zookeeper集群。kafka通過zookeeper管理集群配置,選舉leader,以及在consumer group發(fā)生變幻時舉行rebalance。producer用法push(推)模式將消息發(fā)布到broker,consumer用法pull(拉)模式從broker訂閱并消費消息。 名詞說明: 2.1 topic & partition 一個topic可以認為一個一類消息,每個topic將被分成多個partition,每個partition在存儲

4、層面是append log文件。任何發(fā)布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型的數(shù)字,它唯一標志一條消息。每條消息都被append到partition中,是挨次寫磁盤,因此效率十分高(閱歷證,挨次寫磁盤效率比隨機寫內(nèi)存還要高,這是kafka高吞吐率的一個很重要的保證)。 每一條消息被發(fā)送到broker中,會按照partition規(guī)章挑選被存儲到哪一個partition。假如partition規(guī)章設(shè)置的合理,全部消息可以勻稱分布到不同的partition里,這樣就實現(xiàn)了水平擴展。(假如一個topic

5、對應(yīng)一個文件,那這個文件所在的機器i/o將會成為這個topic的性能瓶頸,而partition解決了這個問題)。在創(chuàng)建topic時可以在$kafka_home/config/perties中指定這個partition的數(shù)量(如下所示),固然可以在topic創(chuàng)建之后去修改partition的數(shù)量。 the default number of log partitions per topic. more partitions allow greater parallelism for consumption, but this will also result in more

6、files across the brokers. num.partitions=3 在發(fā)送一條消息時,可以指定這個消息的key,producer按照這個key和partition機制來推斷這個消息發(fā)送到哪個partition。partition機制可以通過指定producer的partition.class這一參數(shù)來指定,該class必需實現(xiàn)ducer.partitioner接口。 3 高牢靠性存儲分析 kafka的高牢靠性的保障來源于其茁壯的副本(replication)策略。通過調(diào)整其副本相關(guān)參數(shù),可以使得kafka在性能和牢靠性之間運轉(zhuǎn)的游刃有余。kafka從0.8.

7、x版本開頭提供partition級別的復制,replication的數(shù)量可以在$kafka_home/config/perties中配置(default.replication.refactor)。 這里先從kafka文件存儲機制入手,從最底層了解kafka的存儲詳情,進而對其的存儲有個微觀的認知。之后通過kafka復制原理和同步方式來闡述宏觀層面的概念。最后從isr,hw,leader選舉以及數(shù)據(jù)牢靠性和持久性保證等等各個維度來豐盛對kafka相關(guān)學問點的認知。 3.1 kafka文件存儲機制 kafka中消息是以topic舉行分類的,生產(chǎn)者通過topic向kafka b

8、roker發(fā)送消息,消費者通過topic讀取數(shù)據(jù)。然而topic在物理層面又能以partition為分組,一個topic可以分成若干個partition,那么topic以及partition又是怎么存儲的呢?partition還可以細分為segment,一個partition物理上由多個segment組成,那么這些segment又是什么呢?下面我們來一一揭曉。 為了便于解釋問題,假設(shè)這里惟獨一個kafka集群,且這個集群惟獨一個kafka broker,即惟獨一臺物理機。在這個kafka broker中配置($kafka_home/config/perties中)log.d

9、irs=/tmp/kafka-logs,以此來設(shè)置kafka消息文件存儲名目,與此同時創(chuàng)建一個topic:topic_zzh_test,partition的數(shù)量為4($kafka_home/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic topic_vms_test –replication-factor 4)。那么我們此時可以在/tmp/kafka-logs名目中可以看到生成了4個名目: dr

10、wxr-xr-x 2 root root 4096 apr 10 16:10 topic_zzh_test-0 drwxr-xr-x 2 root root 4096 apr 10 16:10 topic_zzh_test-1 drwxr-xr-x 2 root root 4096 apr 10 16:10 topic_zzh_test-2 drwxr-xr-x 2 root root 4096 apr 10 16:10 topic_zzh_test-3 在kafka文件存儲中,同一個topic下有多個不同的partition,每個partiton為一個名目,partition的名稱規(guī)章為:t

11、opic名稱+有序序號,第一個序號從0開頭計,最大的序號為partition數(shù)量減1,partition是實際物理上的概念,而topic是規(guī)律上的概念。 上面提到partition還可以細分為segment,這個segment又是什么?假如就以partition為最小存儲單位,我們可以想象當kafka producer不斷發(fā)送消息,必定會引起partition文件的無限擴張,這樣對于消息文件的維護以及已經(jīng)被消費的消息的清理帶來嚴峻的影響,所以這里以segment為單位又將partition細分。每個partition(名目)相當于一個巨型文件被平均分配到多個大小相等的segment(段)數(shù)據(jù)文

12、件中(每個segment 文件中消息數(shù)量不一定相等)這種特性也便利old segment的刪除,即便利已被消費的消息的清理,提高磁盤的利用率。每個partition只需要支持挨次讀寫就行,segment的文件生命周期由服務(wù)端配置參數(shù)(log.segment.bytes,log.roll.ms,hours等若干參數(shù))打算。 segment文件由兩部分組成,分離為.index文件和.log文件,分離表示為segment索引文件和數(shù)據(jù)文件。這兩個文件的指令規(guī)章為:partition全局的第一個segment從0開頭,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值,

13、數(shù)值大小為64位,20位數(shù)字字符長度,沒有數(shù)字用0填充,如下: 00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index 00000000000000239430.log 以上面的segment文件為例,展示出segment:00000000000000170410的.index文件和.log文件的對應(yīng)的關(guān)系,如下圖: 假如想學習java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,

14、mybatis,netty源碼分析的伴侶可以加我的java高級溝通:854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費共享給大家。 如上圖,.index索引文件存儲大量的元數(shù)據(jù),.log數(shù)據(jù)文件存儲大量的消息,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。其中以.index索引文件中的元數(shù)據(jù)3, 348為例,在.log數(shù)據(jù)文件表示第3個消息,即在全局partition中表示170410+3=170413個消息,該消息的物理偏移地址為348。 那么如何從partition中通過offset查找message呢? 以上圖為例,讀取offset=

15、170418的消息,首先查找segment文件,其中00000000000000000000.index為最開頭的文件,其次個文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了其次個文件之中。其他后續(xù)文件可以依次類推,以其實偏移量命名并羅列這些文件,然后按照二分查找法就可以迅速定位到詳細文件位置。第二按照00000000000000170410.index文件中的8,1325定位到0000

16、0000000000170410.log文件中的1325的位置舉行讀取。 要是讀取offset=170418的消息,從00000000000000170410.log文件中的1325的位置舉行讀取,那么怎么知道何時讀完本條消息,否則就讀到下一條消息的內(nèi)容了? 這個就需要聯(lián)系到消息的物理結(jié)構(gòu)了,消息都具有固定的物理結(jié)構(gòu),包括:offset(8 bytes)、消息體的大小(4 bytes)、crc32(4 bytes)、magic(1 byte)、attributes(1 byte)、key length(4 bytes)、key(k bytes)、payload(n bytes)等等字段,可以確

17、定一條消息的大小,即讀取到哪里截止。 3.2 復制原理和同步方式 kafka中topic的每個partition有一個預寫式的日志文件,雖然partition可以繼續(xù)細分為若干個segment文件,但是對于上層應(yīng)用來說可以將partition看成最小的存儲單元(一個有多個segment文件拼接的巨型文件),每個partition都由一些列有序的、不行變的消息組成,這些消息被延續(xù)的追加到partition中。 上圖中有兩個新名詞:hw和leo。這里先介紹下leo,logendoffset的縮寫,表示每個partition的log最后一條message的位置。hw是highwatermark的縮寫

18、,是指consumer能夠看到的此partition的位置,這個涉及到多副本的概念,這里先提及一下,下節(jié)再詳表。 言歸正傳,為了提高消息的牢靠性,kafka每個topic的partition有n個副本(replicas),其中n(大于等于1)是topic的復制因子(replica fator)的個數(shù)。kafka通過多副本機制實現(xiàn)故障自動轉(zhuǎn)移,當kafka集群中一個broker失效狀況下仍然保證服務(wù)可用。在kafka中發(fā)生復制時確保partition的日志能有序地寫到其他節(jié)點上,n個replicas中,其中一個replica為leader,其他都為follower, leader處理partit

19、ion的全部讀寫哀求,與此同時,follower會被動定期地去復制leader上的數(shù)據(jù)。 如下圖所示,kafka集群中有4個broker, 某topic有3個partition,且復制因子即副本個數(shù)也為3: kafka提供了數(shù)據(jù)復制算法保證,假如leader發(fā)生故障或掛掉,一個新leader被選舉并被接受客戶端的消息勝利寫入。kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追逐leader數(shù)據(jù)。leader負責維護和跟蹤isr(in-sync replicas的縮寫,表示副本同步隊列,詳細可參考下節(jié))中全部follower滯后的狀態(tài)。當producer發(fā)送一條消

20、息到broker后,leader寫入消息并復制到全部follower。消息提交之后才被勝利復制到全部的同步副本。消息復制延遲受最慢的follower限制,重要的是迅速檢測慢副本,假如follower落后太多或者失效,leader將會把它從isr中刪除。 3.3 isr 上節(jié)我們涉及到isr (in-sync replicas),這個是指副本同步隊列。副本數(shù)對kafka的吞吐率是有一定的影響,但極大的增加了可用性。默認狀況下kafka的replica數(shù)量為1,即每個partition都有一個唯一的leader,為了確保消息的牢靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic

21、.replication.factor指定)大小設(shè)置為大于1,比如3。 全部的副本(replicas)統(tǒng)稱為assigned replicas,即ar。isr是ar中的一個子集,由leader維護isr列表,follower從leader同步數(shù)據(jù)有一些延遲(包括延遲時光replica.lag.time.max.ms和延遲條數(shù)replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),隨意一個超過閾值都會把follower剔除出isr, 存入osr(outof-sync replicas)列表,新加入的f

22、ollower也會先存放在osr中。ar=isr+osr。 kafka 0.10.x版本后移除了replica.lag.max.messages參數(shù),只保留了replica.lag.time.max.ms作為isr中副本管理的參數(shù)。為什么這樣做呢?replica.lag.max.messages表示當前某個副本落后leaeder的消息數(shù)量超過了這個參數(shù)的值,那么leader就會把follower從isr中刪除。假設(shè)設(shè)置replica.lag.max.messages=4,那么假如producer一次傳送至broker的消息數(shù)量都小于4條時,由于在leader接受到producer發(fā)送的消息之后

23、而follower副本開頭拉取這些消息之前,follower落后leader的消息數(shù)不會超過4條消息,故此沒有follower移出isr,所以這時候replica.lag.max.message的設(shè)置似乎是合理的。但是producer發(fā)起瞬時高峰流量,producer一次發(fā)送的消息超過4條時,也就是超過replica.lag.max.messages,此時follower都會被認為是與leader副本不同步了,從而被踢出了isr。但事實上這些follower都是存活狀態(tài)的且沒有性能問題。那么在之后追上leader,并被重新加入了isr。于是就會浮現(xiàn)它們不斷地剔出isr然后重新回來isr,這無疑

24、增強了無謂的性能損耗。而且這個參數(shù)是broker全局的。設(shè)置太大了,影響真正落后follower的移除;設(shè)置的太小了,導致follower的頻繁進出。無法給定一個合適的replica.lag.max.messages的值,故此,新版本的kafka移除了這個參數(shù)。 注:isr中包括:leader和follower。 上面一節(jié)還涉及到一個概念,即hw。hw俗稱高水位,highwatermark的縮寫,取一個partition對應(yīng)的isr中最小的leo作為hw,consumer最多只能消費到hw所在的位置。另外每個replica都有hw,leader和follower各自負責更新自己的hw的狀態(tài)。對

25、于leader新寫入的消息,consumer不能立即消費,leader會等待該消息被全部isr中的replicas同步后更新hw,此時消息才干被consumer消費。這樣就保證了假如leader所在的broker失效,該消息仍然可以從新選舉的leader中獵取。對于來自內(nèi)部broker的讀取哀求,沒有hw的限制。 下圖具體的解釋了當producer生產(chǎn)消息至broker后,isr以及hw和leo的流轉(zhuǎn)過程: 由此可見,kafka的復制機制既不是徹低的同步復制,也不是單純的異步復制。實際上,同步復制要求全部能工作的follower都復制完,這條消息才會被commit,這種復制方式極大的影響了吞吐

26、率。而異步復制方式下,follower異步的從leader復制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認為已經(jīng)commit,這種狀況下假如follower都還沒有復制完,落后于leader時,驟然leader宕機,則會走失數(shù)據(jù)。而kafka的這種用法isr的方式則很好的均衡了確保數(shù)據(jù)不走失以及吞吐率。 kafka的isr的管理終于都會反饋到zookeeper節(jié)點上。詳細位置為:/brokers/topics/topic/partitions/partition/state。目前有兩個地方會對這個zookeeper的節(jié)點舉行維護: controller來維護:kafka集群中的其中一個brok

27、er會被選舉為controller,主要負責partition管理和副本狀態(tài)管理,也會執(zhí)行類似于重分配partition之類的管理任務(wù)。在符合某些特定條件下,controller下的leaderselector會選舉新的leader,isr和新的leader_epoch及controller_epoch寫入zookeeper的相關(guān)節(jié)點中。同時發(fā)起leaderandisrrequest通知全部的replicas。 leader來維護:leader有單獨的線程定期檢測isr中follower是否脫離isr, 假如發(fā)覺isr變幻,則會將新的isr的信息返回到zookeeper的相關(guān)節(jié)點中。 3.4

28、數(shù)據(jù)牢靠性和持久性保證 當producer向leader發(fā)送數(shù)據(jù)時,可以通過request.required.acks參數(shù)來設(shè)置數(shù)據(jù)牢靠性的級別: 1(默認):這意味著producer在isr中的leader已勝利收到的數(shù)據(jù)并得到確認后發(fā)送下一條message。假如leader宕機了,則會走失數(shù)據(jù)。 0:這意味著producer無需等待來自broker確實認而繼續(xù)發(fā)送下一批消息。這種狀況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)牢靠性確是最低的。 -1:producer需要等待isr中的全部follower都確認接收到數(shù)據(jù)后才算一次發(fā)送完成,牢靠性最高。但是這樣也不能保證數(shù)據(jù)不走失,比如當isr中惟獨lea

29、der時(前面isr那一節(jié)講到,isr中的成員因為某些狀況會增強也會削減,最少就只剩一個leader),這樣就變成了acks=1的狀況。 假如要提高數(shù)據(jù)的牢靠性,在設(shè)置request.required.acks=-1的同時,也要min.insync.replicas這個參數(shù)(可以在broker或者topic層面舉行設(shè)置)的協(xié)作,這樣才干發(fā)揮最大的功效。min.insync.replicas這個參數(shù)設(shè)定isr中的最小副本數(shù)是多少,默認值為1,當且僅當request.required.acks參數(shù)設(shè)置為-1時,此參數(shù)才生效。假如isr中的副本數(shù)少于min.insync.replicas配置的數(shù)量時

30、,客戶端會返回異樣:mon.errors.notenoughreplicasexceptoin: messages are rejected since there are fewer in-sync replicas than required。 接下來對acks=1和-1的兩種狀況舉行具體分析: 1. request.required.acks=1 producer發(fā)送數(shù)據(jù)到leader,leader寫本地日志勝利,返回客戶端勝利;此時isr中的副本還沒有來得及拉取該消息,leader就宕機了,那么此次發(fā)送的消息就會走失。 2. request.required.acks=-1 同步(ka

31、fka默認為同步,即producer.type=sync)的發(fā)送模式,replication.factor>=2且min.insync.replicas>=2的狀況下,不會走失數(shù)據(jù)。 有兩種典型狀況。acks=-1的狀況下(如無特別解釋,以下acks都表示為參數(shù)request.required.acks),數(shù)據(jù)發(fā)送到leader, isr的follower所有完成數(shù)據(jù)同步后,leader此時掛掉,那么會選舉出新的leader,數(shù)據(jù)不會走失。 假如想學習java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,mybatis,netty源碼分析的伴侶可以加我的java高級溝通:

32、854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費共享給大家。 acks=-1的狀況下,數(shù)據(jù)發(fā)送到leader后 ,部分isr的副本同步,leader此時掛掉。比如follower1h和follower2都有可能變成新的leader, producer端會得到返回異樣,producer端會重新發(fā)送數(shù)據(jù),數(shù)據(jù)可能會重復。 固然上圖中假如在leader crash的時候,follower2還沒有同步到任何數(shù)據(jù),而且follower2被選舉為新的leader的話,這樣消息就不會重復。 注:kafka只處理fail/recover問題,不處理byzantine問題。

33、3.5 關(guān)于hw的進一步探討 考慮上圖(即acks=-1,部分isr副本同步)中的另一種狀況,假如在leader掛掉的時候,follower1同步了消息4,5,follower2同步了消息4,與此同時follower2被選舉為leader,那么此時follower1中的多出的消息5該做如何處理呢? 這里就需要hw的協(xié)同協(xié)作了。如前所述,一個partition中的isr列表中,leader的hw是全部isr列表里副本中最小的那個的leo。類似于木桶原理,水位取決于最低那塊短板。 如上圖,某個topic的某partition有三個副本,分離為a、b、c。a作為leader絕對是leo最高,b緊隨其

34、后,c機器因為配置比較低,網(wǎng)絡(luò)比較差,故而同步最慢。這個時候a機器宕機,這時候假如b成為leader,如果沒有hw,在a重新復原之后會做同步(makefollower)操作,在宕機時log文件之后挺直做追加操作,而如果b的leo已經(jīng)達到了a的leo,會產(chǎn)生數(shù)據(jù)不全都的狀況,所以用法hw來避開這種狀況。 a在做同步操作的時候,先將log文件截斷到之前自己的hw的位置,即3,之后再從b中拉取消息舉行同步。 假如失敗的follower復原過來,它首先將自己的log文件截斷到上次checkpointed時刻的hw的位置,之后再從leader中同步消息。leader掛掉會重新選舉,新的leader會發(fā)送

35、命令讓其余的follower截斷至自身的hw的位置然后再拉取新的消息。 當isr中的個副本的leo不全都時,假如此時leader掛掉,選舉新的leader時并不是根據(jù)leo的凹凸舉行選舉,而是根據(jù)isr中的挨次選舉。 3.6 leader選舉 一條消息惟獨被isr中的全部follower都從leader復制過去才會被認為已提交。這樣就避開了部分數(shù)據(jù)被寫進了leader,還沒來得及被任何follower復制就宕機了,而造成數(shù)據(jù)走失。而對于producer而言,它可以挑選是否等待消息commit,這可以通過request.required.acks來設(shè)置。這種機制確保了只要isr中有一個或者以上的

36、follower,一條被commit的消息就不會走失。 有一個很重要的問題是當leader宕機了,怎樣在follower中選舉出新的leader,由于follower可能落后無數(shù)或者挺直crash了,所以必需確保挑選最新的follower作為新的leader。一個基本的原則就是,假如leader不在了,新的leader必需擁有本來的leader commit的全部消息。這就需要做一個折中,假如leader在表名一個消息被commit前等待更多的follower確認,那么在它掛掉之后就有更多的follower可以成為新的leader,但這也會造成吞吐率的下降。 一種十分常用的選舉leader的方

37、式是少數(shù)聽從多數(shù),kafka并不是采納這種方式。這種模式下,假如我們有2f+1個副本,那么在commit之前必需保證有f+1個replica復制完消息,同時為了保證能正確選舉出新的leader,失敗的副本數(shù)不能超過f個。這種方式有個很大的優(yōu)勢,系統(tǒng)的延遲取決于最快的幾臺機器,也就是說比如副本數(shù)為3,那么延遲就取決于最快的那個follower而不是最慢的那個。少數(shù)聽從多數(shù)的方式也有一些劣勢,為了保證leader選舉的正常舉行,它所能容忍的失敗的follower數(shù)比較少,假如要容忍1個follower掛掉,那么起碼要3個以上的副本,假如要容忍2個follower掛掉,必需要有5個以上的副本。也就是

38、說,在生產(chǎn)環(huán)境下為了保證較高的容錯率,必需要有大量的副本,而大量的副本又會在大數(shù)據(jù)量下導致性能的急劇下降。這種算法更多用在zookeeper這種分享集群配置的系統(tǒng)中而很少在需要大量數(shù)據(jù)的系統(tǒng)中用法的緣由。hdfs的ha功能也是基于少數(shù)聽從多數(shù)的方式,但是其數(shù)據(jù)存儲并不是采納這樣的方式。 事實上,leader選舉的算法十分多,比如zookeeper的zab、raft以及viewstamped replication。而kafka所用法的leader選舉算法更像是微軟的pacifica算法。 kafka在zookeeper中為每一個partition動態(tài)的維護了一個isr,這個isr里的全部rep

39、lica都跟上了leader,惟獨isr里的成員才干有被選為leader的可能(unclean.leader.election.enable=false)。在這種模式下,對于f+1個副本,一個kafka topic能在保證不走失已經(jīng)commit消息的前提下容忍f個副本的失敗,在大多數(shù)用法場景下,這種模式是非常有利的。實際上,為了容忍f個副本的失敗,少數(shù)聽從多數(shù)的方式和isr在commit前需要等待的副本的數(shù)量是一樣的,但是isr需要的總的副本的個數(shù)幾乎是少數(shù)聽從多數(shù)的方式的一半。 上文提到,在isr中起碼有一個follower時,kafka可以確保已經(jīng)commit的數(shù)據(jù)不走失,但假如某一個pa

40、rtition的全部replica都掛了,就無法保證數(shù)據(jù)不走失了。這種狀況下有兩種可行的計劃: 等待isr中隨意一個replica活過來,并且選它作為leader 挑選第一個活過來的replica(并不一定是在isr中)作為leader 這就需要在可用性和全都性當中作出一個容易的抉擇。假如一定要等待isr中的replica活過來,那不行用的時光就可能會相對較長。而且假如isr中全部的replica都無法活過來了,或者數(shù)據(jù)走失了,這個partition將永久不行用。挑選第一個活過來的replica作為leader,而這個replica不是isr中的replica,那即使它并不保障已經(jīng)包含了全部已

41、commit的消息,它也會成為leader而作為consumer的數(shù)據(jù)源。默認狀況下,kafka采納其次種策略,即unclean.leader.election.enable=true,也可以將此參數(shù)設(shè)置為false來啟用第一種策略。 unclean.leader.election.enable這個參數(shù)對于leader的選舉、系統(tǒng)的可用性以及數(shù)據(jù)的牢靠性都有至關(guān)重要的影響。下面我們來分析下幾種典型的場景。 假如上圖所示,假設(shè)某個partition中的副本數(shù)為3,replica-0, replica-1, replica-2分離存放在broker0, broker1和broker2中。ar=(0

42、,1,2),isr=(0,1)。 設(shè)置request.required.acks=-1, min.insync.replicas=2,unclean.leader.election.enable=false。這里講broker0中的副本也稱之為broker0起初broker0為leader,broker1為follower。 當isr中的replica-0浮現(xiàn)crash的狀況時,broker1選舉為新的leaderisr=(1),由于受min.insync.replicas=2影響,write不能服務(wù),但是read能繼續(xù)正常服務(wù)。此種狀況復原計劃: 嘗試復原(重啟)replica-0,假如能起

43、來,系統(tǒng)正常; 假如replica-0不能復原,需要將min.insync.replicas設(shè)置為1,復原write功能。 當isr中的replica-0浮現(xiàn)crash,緊接著replica-1也浮現(xiàn)了crash, 此時isr=(1),leader=-1,不能對外提供服務(wù),此種狀況復原計劃: 嘗試復原replica-0和replica-1,假如都能起來,則系統(tǒng)復原正常; 假如replica-0起來,而replica-1不能起來,這時候仍然不能選出leader,由于當設(shè)置unclean.leader.election.enable=false時,leader只能從isr中選舉,當isr中全部副本

44、都失效之后,需要isr中最后失效的那個副本能復原之后才干選舉leader, 即replica-0先失效,replica-1后失效,需要replica-1復原后才干選舉leader。保守的計劃建議把unclean.leader.election.enable設(shè)置為true,但是這樣會有走失數(shù)據(jù)的狀況發(fā)生,這樣可以復原read服務(wù)。同樣需要將min.insync.replicas設(shè)置為1,復原write功能; replica-1復原,replica-0不能復原,這個狀況上面碰到過,read服務(wù)可用,需要將min.insync.replicas設(shè)置為1,復原write功能; replica-0和re

45、plica-1都不能復原,這種狀況可以參考情形2. 當isr中的replica-0, replica-1同時宕機,此時isr=(0,1),不能對外提供服務(wù),此種狀況復原計劃:嘗試復原replica-0和replica-1,當其中隨意一個副本復原正常時,對外可以提供read服務(wù)。直到2個副本復原正常,write功能才干復原,或者將將min.insync.replicas設(shè)置為1。 3.7 kafka的發(fā)送模式 kafka的發(fā)送模式由producer端的配置參數(shù)producer.type來設(shè)置,這個參數(shù)指定了在后臺線程中消息的發(fā)送方式是同步的還是異步的,默認是同步的方式,即producer.typ

46、e=sync。假如設(shè)置成異步的模式,即producer.type=async,可以是producer以batch的形式push數(shù)據(jù),這樣會極大的提高broker的性能,但是這樣會增強走失數(shù)據(jù)的風險。假如需要確保消息的牢靠性,必需要將producer.type設(shè)置為sync。 對于異步模式,還有4個配套的參數(shù),如下: 以batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka producer可以將消息在內(nèi)存中累計到一定數(shù)量后作為一個batch發(fā)送哀求。batch的數(shù)量大小可以通過producer的參數(shù)(batch.num.messages)控制。通過增強batch的大小,可以削減網(wǎng)絡(luò)哀求和磁

47、盤io的次數(shù),固然詳細參數(shù)設(shè)置需要在效率和時效性方面做一個權(quán)衡。在比較新的版本中還有batch.size這個參數(shù)。 4 高牢靠性用法分析 4.1 消息傳輸保障 前面已經(jīng)介紹了kafka如何舉行有效的存儲,以及了解了producer和consumer如何工作。接下來研究的是kafka如何確保消息在producer和consumer之間傳輸。有以下三種可能的傳輸保障(delivery guarantee): at most once: 消息可能會丟,但絕不會重復傳輸 at least once:消息絕不會丟,但可能會重復傳輸 exactly once:每條消息絕對會被傳輸一次且僅傳輸一次 kafk

48、a的消息傳輸保障機制十分直觀。當producer向broker發(fā)送消息時,一旦這條消息被commit,因為副本機制(replication)的存在,它就不會走失。但是假如producer發(fā)送數(shù)據(jù)給broker后,碰到的網(wǎng)絡(luò)問題而造成通信中斷,那producer就無法推斷該條消息是否已經(jīng)提交(commit)。雖然kafka無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是producer可以retry多次,確保消息已經(jīng)正確傳輸?shù)絙roker中,所以目前kafka實現(xiàn)的是at least once。 consumer從broker中讀取消息后,可以挑選commit,該操作會在zookeeper中存下該consu

49、mer在該partition下讀取的消息的offset。該consumer下一次再讀該partition時會從下一條開頭讀取。如未commit,下一次讀取的開頭位置會跟上一次commit之后的開頭位置相同。固然也可以將consumer設(shè)置為autocommit,即consumer一旦讀取到數(shù)據(jù)立刻自動commit。假如只研究這一讀取消息的過程,那kafka是確保了exactly once, 但是假如因為前面producer與broker之間的某種緣由導致消息的重復,那么這里就是at least once。 考慮這樣一種狀況,當consumer讀完消息之后先commit再處理消息,在這種模式下,

50、假如consumer在commit后還沒來得及處理消息就crash了,下次重新開頭工作后就無法讀到剛剛已提交而未處理的消息,這就對應(yīng)于at most once了。 讀完消息先處理再commit。這種模式下,假如處理完了消息在commit之前consumer crash了,下次重新開頭工作時還會處理剛剛未commit的消息,事實上該消息已經(jīng)被處理過了,這就對應(yīng)于at least once。 要做到exactly once就需要引入消息去重機制。 4.2 消息去重 如上一節(jié)所述,kafka在producer端和consumer端都會浮現(xiàn)消息的重復,這就需要去重處理。 kafka文檔中提及guid(

51、globally unique identifier)的概念,通過客戶端生成算法得到每個消息的unique id,同時可映射至broker上存儲的地址,即通過guid便可查詢提取消息內(nèi)容,也便于發(fā)送方的冪等性保證,需要在broker上提供此去重處理模塊,目前版本尚不支持。 針對guid, 假如從客戶端的角度去重,那么需要引入集中式緩存,必定會增強依靠復雜度,另外緩存的大小難以界定。 不只是kafka, 類似rabbitmq以及rocketmq這類商業(yè)級中間件也只保障at least once, 且也無法從自身去舉行消息去重。所以我們建議業(yè)務(wù)方按照自身的業(yè)務(wù)特點舉行去重,比如業(yè)務(wù)消息本身具備冪等

52、性,或者借助redis等其他產(chǎn)品舉行去重處理。 4.3 高牢靠性配置 kafka提供了很高的數(shù)據(jù)冗余彈性,對于需要數(shù)據(jù)高牢靠性的場景,我們可以增強數(shù)據(jù)冗余備份數(shù)(replication.factor),調(diào)高最小寫入副本數(shù)的個數(shù)(min.insync.replicas)等等,但是這樣會影響性能。反之,性能提高而牢靠性則降低,用戶需要自身業(yè)務(wù)特性在彼此之間做一些權(quán)衡性挑選。 要保證數(shù)據(jù)寫入到kafka是平安的,高牢靠的,需要如下的配置: topic的配置:replication.factor>=3,即副本數(shù)起碼是3個;2 acks_1 > ack_-1; 副本數(shù)越高,tps越低;副本數(shù)

53、全都時,min.insync.replicas不影響tps; acks=0/1時,tps與min.insync.replicas參數(shù)以及副本數(shù)無關(guān),僅受acks策略的影響。 下面將partition的個數(shù)設(shè)置為1,來進一步確認下不同的acks策略、不同的min.insync.replicas策略以及不同的副本數(shù)對于發(fā)送速度的影響,具體請看情景2和情景3。 場景2:在partition個數(shù)固定為1,測試不同的副本數(shù)和min.insync.replicas策略對發(fā)送速度的影響。 詳細配置:一個producer;發(fā)送方式為sync;消息體大小為1kb;producer端acks=-1(all)。變換

54、副本數(shù):2/3/4; min.insync.replicas設(shè)置為:1/2/4。 測試結(jié)果如下: 測試結(jié)果分析:副本數(shù)越高,tps越低(這點與場景1的測試結(jié)論吻合),但是當partition數(shù)為1時差距甚微。min.insync.replicas不影響tps。 場景3:在partition個數(shù)固定為1,測試不同的acks策略和副本數(shù)對發(fā)送速度的影響。 詳細配置:一個producer;發(fā)送方式為sync;消息體大小為1kb;min.insync.replicas=1。topic副本數(shù)為:1/2/4;acks: 0/1/-1。 測試結(jié)果如下: 測試結(jié)果分析(與情景1全都): 副本數(shù)越多,tps越低; 客戶端的acks策略對發(fā)送的tps有較大的影響,tps:acks_0 > acks_1 > ack_-1。 場景4:測試不同partition數(shù)對發(fā)送速率

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論