丨kafkaconsumer源碼分析消息消費(fèi)的實(shí)現(xiàn)過(guò)程_第1頁(yè)
丨kafkaconsumer源碼分析消息消費(fèi)的實(shí)現(xiàn)過(guò)程_第2頁(yè)
丨kafkaconsumer源碼分析消息消費(fèi)的實(shí)現(xiàn)過(guò)程_第3頁(yè)
丨kafkaconsumer源碼分析消息消費(fèi)的實(shí)現(xiàn)過(guò)程_第4頁(yè)
丨kafkaconsumer源碼分析消息消費(fèi)的實(shí)現(xiàn)過(guò)程_第5頁(yè)
已閱讀5頁(yè),還剩6頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

在開(kāi)始分析源碼之前,我們一起來(lái)回顧一下Kafka消費(fèi)模型的幾個(gè)要點(diǎn)Kafka的每個(gè)Consumer(消費(fèi)者)實(shí)例屬于一個(gè)ConsumerGroup(消費(fèi)組在消費(fèi)時(shí),ConsumerGroup的每個(gè)Consumer占一個(gè)或多個(gè)Partition(分對(duì)于每個(gè)ConsumerGroup,在任意時(shí)刻,每個(gè)Partition多有1Consumer每個(gè)ConsumerGroup都有一個(gè)Coordinator(協(xié)調(diào)者)負(fù)責(zé)分配Consumer和Partition對(duì)應(yīng)關(guān)系,當(dāng)Partition是Consumer生變更是,會(huì)reblance(重新分配)過(guò)程,重新分配Consumer與Partition的對(duì)應(yīng)關(guān)系;Consumer與Coordinator之間的心跳,這樣Coordinator就能感知Consumer狀態(tài)Consumer的時(shí)候及時(shí)觸發(fā)rebalance掌握并理解Kafka的消費(fèi)模型,對(duì)于接下來(lái)理解其消費(fèi)的實(shí)現(xiàn)過(guò)程是至關(guān)重要的,如果你對(duì)上面的這些要點(diǎn)還有不清楚的地方,建議回顧一下之前的課程或者看一下Kafka相關(guān)的我們使用當(dāng)前的版本2.2進(jìn)行分析,使用Git 上直接源碼到本地代碼git cdgitcheckout09|Kafka的文檔,看看從哪兒來(lái)入手開(kāi)啟我們的分析流Kafka的Consumer類KafkaConsumer的JavaDoc,給出了關(guān)于如何使用KafkaConsumer常詳細(xì)的說(shuō)明文檔,并且給出了一個(gè)使用Consumer費(fèi)的最簡(jiǎn)代碼代碼//設(shè)置必要的配置信Propertiesprops=newprops.put("bootstrap.servers",props.put("group.id", mit", erval.ms",props.put("key.deserializer", props.put("value.deserializer", 9//創(chuàng)建Consumer實(shí)KafkaConsumer<String,String>consumer=new//訂閱consumer.subscribe(Arrays.asList("foo",//循環(huán)拉while(true)ConsumerRecords<String,String>records=for(ConsumerRecord<String,String>record:System.out.printf("offset=%d,key=%s,value=%s%n", 這段代碼主要的主要流程設(shè)置必要的配置信息,包括:起始連接的Broker址,ConsumerGroupID,自創(chuàng)建Consumer訂閱了2個(gè)Topic:foo和循環(huán)拉取消息并打印在控通過(guò)上面的代碼實(shí)例我們可以看到,消費(fèi)這個(gè)大的流程,在Kafka中實(shí)際上是被了“訂閱”和“拉取消息”這兩個(gè)小的流程。另外,我在之前的課程中反復(fù)提到過(guò),Ka在消費(fèi)過(guò)程中,每個(gè)Consumr實(shí)例是綁定到一個(gè)分區(qū)上的,那Consumr是如何確定,綁定到哪一個(gè)分區(qū)上的呢?這個(gè)問(wèn)題也是可以通過(guò)分析消費(fèi)流程來(lái)找到答案的。所以,我們分析整個(gè)消費(fèi)流程主要聚焦在三個(gè)問(wèn)題上:訂閱過(guò)程是如何實(shí)現(xiàn)的Consumer是如何與Coordinator協(xié)商,確定消費(fèi)哪些Partition的?拉取消息的過(guò)程是如何實(shí)了解前兩個(gè)問(wèn)題,有助于你充分理解Kafka的元數(shù)據(jù)模型,以及Kafka是如何在客戶端和就帶著這三個(gè)問(wèn)題,來(lái)分析Kafka的訂閱和拉取消息的過(guò)程如何實(shí)現(xiàn)。我們先來(lái)看看訂閱的實(shí)現(xiàn)流程。從上面的例子到訂閱的主流程方法代碼publicvoidsubscribe(Collection<String>topics,ConsumerRebalanceListenertry//省略部分代5//重置訂閱狀this.subscriptions.subscribe(newHashSet<>(topics),8//更新元}finally subscript,另一個(gè)是更新元數(shù)據(jù)中的topic信息。訂閱狀態(tài)subscripts主要了訂閱的topic和ptn的消費(fèi)置等狀態(tài)信息。屬性metadata中了Kaa集群元數(shù)據(jù)的一個(gè)子集,包括集群的Brokr節(jié)點(diǎn)、c和Partn在節(jié)點(diǎn)上分布,以及我們聚焦的第二個(gè)問(wèn)題:Coordinator給ConrPartn請(qǐng)注意一下,這個(gè)subscribe()方法的實(shí)現(xiàn)有一個(gè)非常值得大家學(xué)習(xí)的地方:就是開(kāi)始的acquireAndEnsureOpen()try-finallyrelease(),作用就是保護(hù)這個(gè)方法只能單線程調(diào)Kafka在文檔中明確地注明了Consumer不是線程安全的,意味著Consumer被并發(fā)調(diào)用時(shí)會(huì)出現(xiàn)不可預(yù)期的結(jié)果。為了避免這種情況發(fā)生 做了主動(dòng)的檢測(cè)并拋出異常,不是放任系統(tǒng)產(chǎn)生不可預(yù)期的情Kaa“主動(dòng)檢測(cè)不支持的情況并拋出異常,避免系統(tǒng)產(chǎn)生不可預(yù)期的行為”這種模式,對(duì)于增強(qiáng)的系統(tǒng)的健壯性是一種非常有效的做法。如果你的系統(tǒng)不支持用戶的某種操作,正確的做法是,檢測(cè)不支持的操作,直接用戶操作,并給出明確的錯(cuò)誤提示,而不應(yīng)該只是具體Kafka是如何實(shí)現(xiàn)的并發(fā)檢測(cè),大家可以看一下方法acquireAndEnsureOpen()的實(shí)繼續(xù)跟進(jìn)到更新元數(shù)據(jù)的方法metadata.setTopics()里面,這個(gè)方法的實(shí)現(xiàn)除了更新元數(shù)據(jù)類Metadata中的topic相關(guān)的一些屬性以外,還調(diào)用了Metadata.requestUpdate()代碼publicsynchronizedintrequestUpdate()this.needUpdate=return requestUpate()tetrueKafka必須確保ConsumrBrokrPartn分析完訂閱相關(guān)的代碼,我們來(lái)總結(jié)一下:在訂閱的實(shí)現(xiàn)過(guò)程中,Kafka更新了訂閱狀態(tài)subscriptsmetadatatopic的一些屬性,將元數(shù)據(jù)狀態(tài)置為“需要那這個(gè)元數(shù)據(jù)會(huì)在什么時(shí)候真正做一次更新呢?我們可以先帶著這個(gè)問(wèn)題接著看接下來(lái),我們分析拉取消息的流程。這個(gè)流程的時(shí)序圖如下(點(diǎn)擊可放大查看我們對(duì)著時(shí)序圖來(lái)分析它的實(shí)現(xiàn)流程。在KafkaConsumer.poll(法對(duì)應(yīng)源碼1179)的實(shí)現(xiàn)里面,可以看到主要是先后調(diào)用了2個(gè)私有方法:updateAssignmentMetadataIfNeeded():更新元數(shù)據(jù)pollForFetches():拉取消息方法updateAssignmentMetadataIfNeeded()中,調(diào)用了coordinator.poll()方法,poll() .ensureFreshMetadata()方法,在 方法中又調(diào)用了 .poll()方法,實(shí)現(xiàn)了與Cluster通信,在Coordinator上Consumer并拉取和更新元數(shù)據(jù)。至此,“元數(shù)據(jù)會(huì)在什么時(shí)候真正做一次更新”這個(gè)問(wèn)題類ConsumerNetwork 封裝了Consumer和Cluster之間所有的網(wǎng)絡(luò)通信的實(shí)現(xiàn),這個(gè)類是一個(gè)非常徹底的異步實(shí)現(xiàn)。它沒(méi)有任何的線程,所有待發(fā)送的Request都存放在屬性u(píng)nsent中,返回的Response存放在屬性 pletion中。每次調(diào)用poll()方法的時(shí)候,在當(dāng)前線程中發(fā)送所有待發(fā)送的Request,處理所有收到的 MQ的代碼,Producer和Consumer在主要收發(fā)消息流程上功能的復(fù)雜度是差不多的,但是你可以很明顯地感受到Kafka的代碼實(shí)現(xiàn)要比 MQ的代碼實(shí)現(xiàn)更加的復(fù)雜難于理解。我們繼續(xù)分析方法pollForFetches()的實(shí)現(xiàn)代碼 privateMap<TopicPartition,List<ConsumerRecord<K,V>>>pollForFetches(Timer2//省略部分代3//如果緩存里面有的消息,直接返回這些4finalMap<TopicPartition,=5if(!records.isEmpty())6return7}8//構(gòu)造拉取消息請(qǐng)求,并發(fā)9//省略部分代//發(fā)送網(wǎng)絡(luò)請(qǐng)求拉取消息,等待直到有消息返回或者.poll(pollTimer,()-> //省略部分代//返回拉到的消return}這段代碼的主要實(shí)現(xiàn)邏輯如果緩存里面有未的消息,直接返回這些消息構(gòu)造拉取消息請(qǐng)求,并發(fā)發(fā)送網(wǎng)絡(luò)請(qǐng)求并拉取消息,等待直到有消息返回或者返回拉到的消息在方法fetcher.sendFetches(實(shí)現(xiàn)里面,Kafka據(jù)元數(shù)據(jù)的信息,構(gòu)造到所有需要的Broker的拉消息的Request,然后調(diào)用.Send()方法將這些請(qǐng)求異步發(fā)送出去。并且,了一個(gè)回調(diào)類來(lái)處理返回的Response,所有返回的Response被暫時(shí)存放在pletedFetches中。需要注意的是,這時(shí)的Request并沒(méi)有被真正發(fā)給各Broker,而是被暫存在了.unsend等待被發(fā)送然后,在調(diào)用.poll()方法時(shí),會(huì)真正將之前構(gòu)造的所有Request發(fā)送出去,并處理收到的Response。最后,fetcher.fetchedRecords()方法中,將返回的Response反序列化后轉(zhuǎn)換為消息列綜合上面的實(shí)現(xiàn)分析,我在這里給出整個(gè)拉取消息的流程涉及到的相關(guān)類的類圖,在這個(gè)類圖中,為了便于你理解,我并沒(méi)有把所有類都繪制上去,只是把本節(jié)課兩個(gè)流程相關(guān)的主要類圖(點(diǎn)擊可放大查看本節(jié)課我們一起分析了KafkaConsumer消費(fèi)消息的實(shí)現(xiàn)過(guò)程。大家來(lái)分析代碼過(guò)程中,不僅僅是要掌握Kafka整個(gè)消費(fèi)的流程是是如何實(shí)現(xiàn)的,更重要的是理解它這種完全異步發(fā)送請(qǐng)求時(shí),構(gòu)建Requt對(duì)象,暫存入發(fā)送隊(duì)列,但不立即發(fā)送,而是等待合適的時(shí)機(jī)批量發(fā)送。并且,用回調(diào)或者RequFte方式,預(yù)先定義好如何處理響應(yīng)的邏輯。Brokr返回的響應(yīng)之后,也不會(huì)立即處理,而是暫存在隊(duì)列中,擇機(jī)處理。那這個(gè)策略較復(fù),有是需響時(shí)候有可緩沖了或間到了,都有可能觸發(fā)一次真正的網(wǎng)絡(luò)請(qǐng)求,也就是在poll()方法中發(fā)送所有待發(fā)送Requ并處理所有Respons。設(shè)計(jì)處是需要用于發(fā)處理的線并且分發(fā)量處理的優(yōu)勢(shì),這也是Kaa的性能非常好的原因之一。這種設(shè)計(jì)的缺點(diǎn)也非常的明顯,就是的成本也很高。總體來(lái)說(shuō),不推薦大家把代碼設(shè)計(jì)得這么復(fù)雜。代碼結(jié)構(gòu)簡(jiǎn)單、清晰、易是是我們?cè)谠O(shè)計(jì)過(guò)程中需要考慮的一個(gè)非常重要的因素。很多時(shí)候,為了獲得較好的代碼結(jié)構(gòu),在可接受的范圍內(nèi),去犧牲一些性能,也是劃算的。我們知道,KafkaConsumer在消費(fèi)過(guò)程中是需要消費(fèi)位置的,Consumer每次從當(dāng)前消費(fèi)位置拉取一批消息,這些消息都被正常消費(fèi)后,Consumer會(huì)給Coordinator發(fā)一個(gè)提交位置的請(qǐng)求,然后消費(fèi)位置會(huì)向后移動(dòng),完成一批消費(fèi)過(guò)程。那kafkaConsumer是如何和提交這個(gè)消費(fèi)位置的呢?請(qǐng)你帶著這個(gè)問(wèn)題再回顧一下

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論