




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
在開始分析源碼之前,我們一起來回顧一下Kafka消費模型的幾個要點Kafka的每個Consumer(消費者)實例屬于一個ConsumerGroup(消費組在消費時,ConsumerGroup的每個Consumer占一個或多個Partition(分對于每個ConsumerGroup,在任意時刻,每個Partition多有1Consumer每個ConsumerGroup都有一個Coordinator(協(xié)調(diào)者)負責(zé)分配Consumer和Partition對應(yīng)關(guān)系,當Partition是Consumer生變更是,會reblance(重新分配)過程,重新分配Consumer與Partition的對應(yīng)關(guān)系;Consumer與Coordinator之間的心跳,這樣Coordinator就能感知Consumer狀態(tài)Consumer的時候及時觸發(fā)rebalance掌握并理解Kafka的消費模型,對于接下來理解其消費的實現(xiàn)過程是至關(guān)重要的,如果你對上面的這些要點還有不清楚的地方,建議回顧一下之前的課程或者看一下Kafka相關(guān)的我們使用當前的版本2.2進行分析,使用Git 上直接源碼到本地代碼git cdgitcheckout09|Kafka的文檔,看看從哪兒來入手開啟我們的分析流Kafka的Consumer類KafkaConsumer的JavaDoc,給出了關(guān)于如何使用KafkaConsumer常詳細的說明文檔,并且給出了一個使用Consumer費的最簡代碼代碼//設(shè)置必要的配置信Propertiesprops=newprops.put("bootstrap.servers",props.put("group.id", mit", erval.ms",props.put("key.deserializer", props.put("value.deserializer", 9//創(chuàng)建Consumer實KafkaConsumer<String,String>consumer=new//訂閱consumer.subscribe(Arrays.asList("foo",//循環(huán)拉while(true)ConsumerRecords<String,String>records=for(ConsumerRecord<String,String>record:System.out.printf("offset=%d,key=%s,value=%s%n", 這段代碼主要的主要流程設(shè)置必要的配置信息,包括:起始連接的Broker址,ConsumerGroupID,自創(chuàng)建Consumer訂閱了2個Topic:foo和循環(huán)拉取消息并打印在控通過上面的代碼實例我們可以看到,消費這個大的流程,在Kafka中實際上是被了“訂閱”和“拉取消息”這兩個小的流程。另外,我在之前的課程中反復(fù)提到過,Ka在消費過程中,每個Consumr實例是綁定到一個分區(qū)上的,那Consumr是如何確定,綁定到哪一個分區(qū)上的呢?這個問題也是可以通過分析消費流程來找到答案的。所以,我們分析整個消費流程主要聚焦在三個問題上:訂閱過程是如何實現(xiàn)的Consumer是如何與Coordinator協(xié)商,確定消費哪些Partition的?拉取消息的過程是如何實了解前兩個問題,有助于你充分理解Kafka的元數(shù)據(jù)模型,以及Kafka是如何在客戶端和就帶著這三個問題,來分析Kafka的訂閱和拉取消息的過程如何實現(xiàn)。我們先來看看訂閱的實現(xiàn)流程。從上面的例子到訂閱的主流程方法代碼publicvoidsubscribe(Collection<String>topics,ConsumerRebalanceListenertry//省略部分代5//重置訂閱狀this.subscriptions.subscribe(newHashSet<>(topics),8//更新元}finally subscript,另一個是更新元數(shù)據(jù)中的topic信息。訂閱狀態(tài)subscripts主要了訂閱的topic和ptn的消費置等狀態(tài)信息。屬性metadata中了Kaa集群元數(shù)據(jù)的一個子集,包括集群的Brokr節(jié)點、c和Partn在節(jié)點上分布,以及我們聚焦的第二個問題:Coordinator給ConrPartn請注意一下,這個subscribe()方法的實現(xiàn)有一個非常值得大家學(xué)習(xí)的地方:就是開始的acquireAndEnsureOpen()try-finallyrelease(),作用就是保護這個方法只能單線程調(diào)Kafka在文檔中明確地注明了Consumer不是線程安全的,意味著Consumer被并發(fā)調(diào)用時會出現(xiàn)不可預(yù)期的結(jié)果。為了避免這種情況發(fā)生 做了主動的檢測并拋出異常,不是放任系統(tǒng)產(chǎn)生不可預(yù)期的情Kaa“主動檢測不支持的情況并拋出異常,避免系統(tǒng)產(chǎn)生不可預(yù)期的行為”這種模式,對于增強的系統(tǒng)的健壯性是一種非常有效的做法。如果你的系統(tǒng)不支持用戶的某種操作,正確的做法是,檢測不支持的操作,直接用戶操作,并給出明確的錯誤提示,而不應(yīng)該只是具體Kafka是如何實現(xiàn)的并發(fā)檢測,大家可以看一下方法acquireAndEnsureOpen()的實繼續(xù)跟進到更新元數(shù)據(jù)的方法metadata.setTopics()里面,這個方法的實現(xiàn)除了更新元數(shù)據(jù)類Metadata中的topic相關(guān)的一些屬性以外,還調(diào)用了Metadata.requestUpdate()代碼publicsynchronizedintrequestUpdate()this.needUpdate=return requestUpate()tetrueKafka必須確保ConsumrBrokrPartn分析完訂閱相關(guān)的代碼,我們來總結(jié)一下:在訂閱的實現(xiàn)過程中,Kafka更新了訂閱狀態(tài)subscriptsmetadatatopic的一些屬性,將元數(shù)據(jù)狀態(tài)置為“需要那這個元數(shù)據(jù)會在什么時候真正做一次更新呢?我們可以先帶著這個問題接著看接下來,我們分析拉取消息的流程。這個流程的時序圖如下(點擊可放大查看我們對著時序圖來分析它的實現(xiàn)流程。在KafkaConsumer.poll(法對應(yīng)源碼1179)的實現(xiàn)里面,可以看到主要是先后調(diào)用了2個私有方法:updateAssignmentMetadataIfNeeded():更新元數(shù)據(jù)pollForFetches():拉取消息方法updateAssignmentMetadataIfNeeded()中,調(diào)用了coordinator.poll()方法,poll() .ensureFreshMetadata()方法,在 方法中又調(diào)用了 .poll()方法,實現(xiàn)了與Cluster通信,在Coordinator上Consumer并拉取和更新元數(shù)據(jù)。至此,“元數(shù)據(jù)會在什么時候真正做一次更新”這個問題類ConsumerNetwork 封裝了Consumer和Cluster之間所有的網(wǎng)絡(luò)通信的實現(xiàn),這個類是一個非常徹底的異步實現(xiàn)。它沒有任何的線程,所有待發(fā)送的Request都存放在屬性unsent中,返回的Response存放在屬性 pletion中。每次調(diào)用poll()方法的時候,在當前線程中發(fā)送所有待發(fā)送的Request,處理所有收到的 MQ的代碼,Producer和Consumer在主要收發(fā)消息流程上功能的復(fù)雜度是差不多的,但是你可以很明顯地感受到Kafka的代碼實現(xiàn)要比 MQ的代碼實現(xiàn)更加的復(fù)雜難于理解。我們繼續(xù)分析方法pollForFetches()的實現(xiàn)代碼 privateMap<TopicPartition,List<ConsumerRecord<K,V>>>pollForFetches(Timer2//省略部分代3//如果緩存里面有的消息,直接返回這些4finalMap<TopicPartition,=5if(!records.isEmpty())6return7}8//構(gòu)造拉取消息請求,并發(fā)9//省略部分代//發(fā)送網(wǎng)絡(luò)請求拉取消息,等待直到有消息返回或者.poll(pollTimer,()-> //省略部分代//返回拉到的消return}這段代碼的主要實現(xiàn)邏輯如果緩存里面有未的消息,直接返回這些消息構(gòu)造拉取消息請求,并發(fā)發(fā)送網(wǎng)絡(luò)請求并拉取消息,等待直到有消息返回或者返回拉到的消息在方法fetcher.sendFetches(實現(xiàn)里面,Kafka據(jù)元數(shù)據(jù)的信息,構(gòu)造到所有需要的Broker的拉消息的Request,然后調(diào)用.Send()方法將這些請求異步發(fā)送出去。并且,了一個回調(diào)類來處理返回的Response,所有返回的Response被暫時存放在pletedFetches中。需要注意的是,這時的Request并沒有被真正發(fā)給各Broker,而是被暫存在了.unsend等待被發(fā)送然后,在調(diào)用.poll()方法時,會真正將之前構(gòu)造的所有Request發(fā)送出去,并處理收到的Response。最后,fetcher.fetchedRecords()方法中,將返回的Response反序列化后轉(zhuǎn)換為消息列綜合上面的實現(xiàn)分析,我在這里給出整個拉取消息的流程涉及到的相關(guān)類的類圖,在這個類圖中,為了便于你理解,我并沒有把所有類都繪制上去,只是把本節(jié)課兩個流程相關(guān)的主要類圖(點擊可放大查看本節(jié)課我們一起分析了KafkaConsumer消費消息的實現(xiàn)過程。大家來分析代碼過程中,不僅僅是要掌握Kafka整個消費的流程是是如何實現(xiàn)的,更重要的是理解它這種完全異步發(fā)送請求時,構(gòu)建Requt對象,暫存入發(fā)送隊列,但不立即發(fā)送,而是等待合適的時機批量發(fā)送。并且,用回調(diào)或者RequFte方式,預(yù)先定義好如何處理響應(yīng)的邏輯。Brokr返回的響應(yīng)之后,也不會立即處理,而是暫存在隊列中,擇機處理。那這個策略較復(fù),有是需響時候有可緩沖了或間到了,都有可能觸發(fā)一次真正的網(wǎng)絡(luò)請求,也就是在poll()方法中發(fā)送所有待發(fā)送Requ并處理所有Respons。設(shè)計處是需要用于發(fā)處理的線并且分發(fā)量處理的優(yōu)勢,這也是Kaa的性能非常好的原因之一。這種設(shè)計的缺點也非常的明顯,就是的成本也很高??傮w來說,不推薦大家把代碼設(shè)計得這么復(fù)雜。代碼結(jié)構(gòu)簡單、清晰、易是是我們在設(shè)計過程中需要考慮的一個非常重要的因素。很多時候,為了獲得較好的代碼結(jié)構(gòu),在可接受的范圍內(nèi),去犧牲一些性能,也是劃算的。我們知道,KafkaConsumer在消費過程中是需要消費位置的,Consumer每次從當前消費位置拉取一批消息,這些消息都被正常消費后,Consumer會給Coordinator發(fā)一個提交位置的請求,然后消費位置會向后移動,完成一批消費過程。那kafkaConsumer是如何和提交這個消費位置的呢?請你帶著這個問題再回顧一下
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 10萬噸煤礦合同范本
- 單位只交社保合同范本
- 公司銷售代理合同范本
- 出售機械板車合同范本
- 醫(yī)藥培訓(xùn)銷售合同范本
- 個人精裝房租賃合同范例
- 保潔大掃除合同范本
- 買汽車有沒有三包合同范本
- 加工基地 合同范本
- 勞務(wù)用工合同范本
- 四年級語文閱讀理解十篇(含答案)
- JJG 705-2014液相色譜儀行業(yè)標準
- (高清版)TDT 1056-2019 縣級國土資源調(diào)查生產(chǎn)成本定額
- 小學(xué)班級管理現(xiàn)狀及策略分析
- 公司合作計劃書
- 2016-2023年南京信息職業(yè)技術(shù)學(xué)院高職單招(英語/數(shù)學(xué)/語文)筆試歷年參考題庫含答案解析
- 半固態(tài)電池技術(shù)工藝
- 跨領(lǐng)域聯(lián)合診療(MDT)管理法規(guī)
- 光伏電站運維安全風(fēng)險管控清單
- 保安員考核評分標準與細則
- 四年級豎式計算大全100道
評論
0/150
提交評論