丨在rebalance中coordinator如何處理成員入組_第1頁
丨在rebalance中coordinator如何處理成員入組_第2頁
丨在rebalance中coordinator如何處理成員入組_第3頁
丨在rebalance中coordinator如何處理成員入組_第4頁
丨在rebalance中coordinator如何處理成員入組_第5頁
已閱讀5頁,還剩15頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

Consumrsession.tut.msCoordinator求,則把該成員標記為ead,而且要顯式地將其從消費者組中移除,并觸發(fā)新一輪的Rebalance。而真正決定單次Rebalance所用最大時長的參數(shù),是Consumrmax.poll.inl.m總體而言,Rebalance的流程大致分為兩大步:加入組(JoinGroup)加入組,是指消費者組下的各個成員向oordinator發(fā)送oinoupst進組的過程。這個過程有一個超時時間,如果有成員在超時時間之內(nèi),無法完成加入組操作,它就會被排除在這輪Rebalance組同步,是指當所有成員都成功加入組之后,Coordinator指定其中一個成員為r,然后將訂閱分區(qū)信息發(fā)給eader成員。接著,所有成員(包括eader成員)向Coordinatornt只有LeadrCoordinator當組同步完成后,Rebalance結(jié)束。此時,消費者組處于正常工作狀態(tài)。要搞懂加入組的源碼機制,須要學習4個方法,分別是handleJoinGroup、doUnknownJoinGroup、doJoinGroup和addMemberAndRebalance。handleJoinGroup是執(zhí)行加入組的頂層方法,被KafkaApis類調(diào)用,該方法依據(jù)給定消費者組成員是否了設(shè)置成員ID,來決定是調(diào)用doUnknownJoinGroup還是doJoinGroup,前者對應(yīng)于未設(shè)定成員ID的情形,后者對應(yīng)于已設(shè)定成員ID的情形。而這兩個方法,都會調(diào)用addMemberAndRebalance,執(zhí)行真正的加入組邏輯。為了幫助你理解它們之間的交互關(guān)系,我畫了一張圖,借用它展示了這4個方法的調(diào)用順序。handleJoinGroup如果你翻開KafkaApis.scala這個API文件,就可以看到,處理JoinGroupRequest請求的方法是handleJoinGroupRequest。而它的主要邏輯,就是調(diào)用GroupCoordinatorhandleJoinGroup請求,所以,我們要具體學下handleJoinGroup方法。先看它的方法簽名:代代12345678defgroupId:String,//memberId:String,//消費者組成員groupInstanceId:Option[String],//組實例IDrequireKnownMemberId:Boolean,//是否需要成員IDId:String,Host:String,//rebalanceTimeoutMs:Int,//Rebalance超時時間,默認是erval.ms99sessionTimeoutMs:Int,protocolType:Stringprotocols:List[(StringArray[Byte])],//responseCallback:JoinCallback):Unit=}memberId:消費者組成員IDgroupInsteId:這是社區(qū)于4版本引入的靜態(tài)成員字段。靜態(tài)成員的引入,可以有效避免因系統(tǒng)升級或程序更新而導致的Rebalance注邏輯就行了。requireKnownMemberId:是否要求成員ID不為空,即是否要求成員必須設(shè)置ID的布爾字段。這個字段如果為True的話,那么,Kafka要求消費者組成員必須設(shè)置ID。未設(shè)置ID的成員,會被加入組。直到它設(shè)置了ID之后,才能重新加入組。 .id值。Coordinator使用它來生成memberId。memberId的格式是 Id值-UUID。rebalanceTimeoutMs:RebalanceCoordinator匯報心跳,那么將被視為“已過期”,從而新一輪Rebalance。responseCaak代代1//validateGroupStatus(groupId,ApiKeys.JOIN_GROUP).foreach{errorresponseCallback(JoinGroupResult(memberId,5//確保sessionTimeoutMs////if(sessionTimeoutMs<groupConfig.groupMinSessionTimeoutMssessionTimeoutMs>groupConfig.groupMaxSessionTimeoutMs)responseCallback(JoinGroupResult(memberId,}else//消費者組成員IDvalisUnknownMember=memberId==groupManager.getOrMaybeCreateGroup(groupId,isUnknownMember)matchcaseNoneresponseCallback(JoinGroupResult(memberId,caseSome(group)group.inLock//if(!acceptJoiningMember(group,memberId))////如果消費者組成員ID}elseif(isUnknownMember)//為空IDdoUnknownJoinGroup(group,groupInstanceId,requireKnownMemberId,}else//為非空IDdoJoinGroup(group,memberId, //如果消費者組正處于PreparingRebalanceif(group.is(PreparingRebalance))//放入Purgatory 45第1用vdateGrouptatus證消組狀的所謂的也就是消費者組名groupIdJoinGroupRquesthandlo方封裝相應(yīng)的錯誤,并調(diào)用回調(diào)函數(shù)返回。否則,就進入到下一步。第2snmtssstmet,group.ma.session.ttms]之間,如果不是,就認定該值是值,從而封裝一個對應(yīng)的異常調(diào)用回調(diào)函數(shù)返回,這兩個參數(shù)分別表示消費者組允許配置的最小和最大會話超時時間;如果是的話,就進入下一步。第3D信息,并查看它是否為空。之后,通過GrouptaMrDNon。一旦返回了Non,handleJoinGroup方封裝未知成員ID”的異常,調(diào)用回調(diào)函數(shù)返回。4acceptJoiningMember實狀態(tài)一EmptyDeadTrue,表示可以接納狀態(tài)二PreparingRebalance當前等待加入組的成員數(shù)小于Broker端參數(shù)group.max.size值。狀態(tài)三:如果是其他狀態(tài),那么,入組的條件是該成員是已有成員,或者是當前組總成員數(shù)小于Bror端參數(shù)groupmax.size值。需要注意的是,這里比較的是Rebalance過渡到Comptbaae之后,沒有完成加入組的成員,就會被移除。brD空,就執(zhí)行doUnknownJoinGroup或doJoinGroup第5步是嘗試完成JoinGroupRequest請求的處理。如果消費者組處于PreparingRebalance狀態(tài),那么,就將該請求放入Purgatory,嘗試立即完成;如果是其它狀態(tài),則無需將請求放入Purgatory。畢竟,我們處理的是加入組的邏輯,而此時消費者組的狀態(tài)應(yīng)該要變更到PreparingRebalance后,Rebalance才能完成加入組操作。當然,如果延時請求不能立即完成,則交由Purgatory統(tǒng)一進行延時處理。至此,handleJoinGroupdoUnknownJoinGroupdoUnknownJoinGroup法,因為此時,它們的MemberID尚未生成。memberId之外,該方法的輸入?yún)?shù)與haGroup不一一地詳細介紹了,我們直接看它的源碼。為了便于你理解,我省略了關(guān)于靜態(tài)成員以及BUGO代代123456789group.inLock//Deadif(group.is(Dead))////成員配置的協(xié)議類型/}elseif(!group.supportsProtocols(protocolType,MemberMetadata.}else//根據(jù)規(guī)則為該成員創(chuàng)建成員valnewMemberId=if(group.hasStaticMember(groupInstanceId))//如果要求成員ID}elseif(requireKnownMemberId)Id,addPendingMemberExpiration(group,newMemberId,responseCallback(JoinGroupResult(newMemberId,}elseHost,protocolType,protocols,group,}}}DeadeadCoordinatorCoordinatorBrokr找的Coordinator,然后重新發(fā)起加如果狀態(tài)不是ead如果這些檢查都順利通過,接著,代碼就會為該成員生成成員ID,生成規(guī)則是 UUID。這便是generateMemberId方法做的事情。然后,handleJoinGroup方根據(jù)requireKnownMemberId的取值,來決定下面的邏輯路徑:如果該值為True,則將該成員加入到待決成員列表(PendingMemberList)中,然ID,將該成員的入組申請“打回去”,令其分配好了成員ID之后再重新申請;FalseaddMemberAndRebalance到組中。至此,handleJoinGroup方法結(jié)束。通常來說,如果你沒有啟用靜態(tài)成員機制的話,requireKnownMemberId的值是True,這是由KafkaApis中handleJoinGroupRequest方法的這行語句決定的:代代1valrequireKnownMemberId=joinGroupRequest.version>=4&&可見,如果你使用的是比較新的Kafka客戶端版本,而且沒有配置過Consumer端參數(shù)group.instance.id的話,那么,這個字段的值就是True,這說明,Kafka要求消費者成員加入組時,必須要分配好成員ID。關(guān)于addMemberAndRebalance方法的源碼,一會兒在學習doJoinGroup方法時,我doJoinGroup接下來,我們看下doJoinGroupID的成員,執(zhí)行加入組邏輯的方法。它的輸入?yún)?shù)全部承襲自handleJoinGroup第1代代1234567//如果是Dead狀態(tài),封裝COORDINATOR_NOT_AVAILABLEif(group.is(Dead))responseCallback(JoinGroupResult(memberId,//}elseif(!group.supportsProtocols(protocolType,responseCallback(JoinGroupResult(memberId,889//如果是待決成員,由于這次分配了成員ID}elseif(group.isPendingMember(memberId)){if(groupInstanceId.isDefined){}elseaddMemberAndRebalance(rebalanceTimeoutMs,sessionTimeoutMs,memberId,Host,protocolType,protocols,group,}}else//第二部分代碼}不同的是,doJoinGroup要判斷當前申請入組的成員是否是待決成員。如果是的話,那IDaddMemberAndRebalance2第2代代//valmember=group.currentStatematch//如果是PreparingRebalancecasePreparingRebalance//更新成員信息并開始準備updateMemberAndRebalance(group,member,protocols,//如果是CompletingRebalancecaseCompletingRebalance//if(member.matches(protocols))members=if(group.isLeader(memberId))}elsememberId=generationId=group.generationId,protocolType=tocolType,protocolName=tocolName,leaderId=group.leaderOrNull,error=Errors.NONE))//否則,更新成員信息并開始準備}elseupdateMemberAndRebalance(group,member,protocols,}如果是StablecaseStablevalmember=如果成員是Leaderif(group.isLeader(memberId)||!member.matches(protocols))//更新成員信息并開始準備updateMemberAndRebalance(group,member,protocols,}members=List.empty,memberId=generationId=group.generationId,protocolType=tocolType,protocolName=tocolName,leaderId=group.leaderOrNull,error=Errors.NONE))}caseEmpty|Deadwarn(s"Attempttoaddrejoiningmember$memberIdofgroup${group.groupId}s"unexpectedgroupstate${group.currentState}")responseCallback(JoinGroupResult(memberId,這部分代碼的第1步第2步4如果是PreparingRebalanceRebalance么,調(diào)用updateMemberAndRebalance方法更新成員信息,并開始準備Comptbaance狀態(tài),那么,就判斷一下,該成員的分區(qū)消費分配策略與訂閱分區(qū)列表是否和已保存記錄中的一致,如果相同,就說明該成員已經(jīng)應(yīng)該發(fā)起過加入組的操作,并且CoordinatorJoinGroupResult果protocols不相同,那么,就說明成員變更了訂閱信息或分配策略,就要調(diào)用 方法,更新成員信息,并開始準備新一 如果是Stable狀態(tài),那么,就判斷該成員是否是Leader成員,或者是它的訂閱信息或分配策略發(fā)生了變更。如果是這種情況,就調(diào)用updateMemberAndRebalance方法強迫一次新的Rebalance。否則的話,返回當前組信息給該成員即可,通知它們可以發(fā)起Rebalance的下一步操作。EmptyDeadupdateMemberAndRebalanceGroupMetadataupdateMember準備Rebalance:這一步的思想,是將消費者組狀態(tài)變更到PreparingRebalance,然后創(chuàng)建DelayedJoin對象,并交由Purgatory,等待延時處理加入組操作。并放入Purgatory,因此,我不展開說了,你可以自行閱讀下這部分代碼。addMemberAndRebalancedoUnknownJoinGroupdoJoinGroup準備Rebalance。代代privatedefrebalanceTimeoutMs:33456789sessionTimeoutMs:Int,memberId:String,groupInstanceId:Option[String],Id:String,Host:String,protocolType:String,protocols:List[(String,Array[Byte])],group:GroupMetadata,callback:JoinCallback):Unit=創(chuàng)建MemberMetadatavalmember=newmemberId,group.groupId,Host,sessionTimeoutMs,protocolType,member.isNew=//如果消費者組準備開啟首次Rebalance,設(shè)置newMemberAdded為Trueif(group.is(PreparingRebalance)&&group.generationId0)group.newMemberAdded=group.add(member,completeAndScheduleNextExpiration(group,member,NewMemberJoinTimeoutMs)if(member.isStaticMember){info(s"Addingnewstaticmember$groupInstanceIdtogroup${group.groupId}group.addStaticMember(groupInstanceId,memberId)}else}//準備開啟maybePrepareRebalance(group,s"Addingnewmember$memberIdwithgroup第1步,該方根據(jù)傳入?yún)?shù)創(chuàng)建一個MemberMetadata對象實例,并設(shè)置isNew字段為True,標識其是一個新成員。isNew字段與心跳設(shè)置相關(guān)聯(lián),你可以閱讀下MemberMetadata的hasSatisfiedHeartbeat方法的代碼,搞明白該字段是如何幫助Coordinator確認消費者組成員心跳的。第2步,代碼會判斷消費者組是否是首次開啟Rebalance。如果是的話,就把mberAdded字段設(shè)置為;如果不是,則無需執(zhí)行這個賦值操作。這個字段的作用,是Kaa為消費者組RebalaneRebalance時,讓Coordinator等待一段時間,從而讓的消費者組成員加入到組中,以免后來者申請入組而反復進行Rebalanc。這段多等待的時間,就是Brokrgroup.initialrebalnce.delas。這里的Added字段,就是用于判斷是否需要多等待這段時間的一個變量。我們接著說回addMemberAndRebalance方法。該方法的第3步是調(diào)用GroupMetadataadd4,代碼將該成員從待決成員列表中移除。畢竟,它已

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論