2022Apache RocketMO源碼解析指南_第1頁(yè)
2022Apache RocketMO源碼解析指南_第2頁(yè)
2022Apache RocketMO源碼解析指南_第3頁(yè)
2022Apache RocketMO源碼解析指南_第4頁(yè)
2022Apache RocketMO源碼解析指南_第5頁(yè)
已閱讀5頁(yè),還剩166頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

TOC\o"1-1"\h\z\uRocketMQDLedger多副本即主從切換專(zhuān)欄回顧(源碼閱讀技巧篇 源碼分析RocketMQ 源碼分析RocketMQ消息軌 RocketMQ多副本前置篇:初探raft協(xié) 源碼分析RocketMQ多副本之Leader選 源碼分析RocketMQDLedger(多副本)之日志追加流 源碼分析RocketMQDLedger(多副本)之日志復(fù)制(傳播 基于raft協(xié)議的RocketMQDLedger多副本日志復(fù)制實(shí)現(xiàn)原 源碼分析RocketMQDLedger多副本存儲(chǔ)實(shí) 源碼分析RocketMQ整合DLedger(多副本)實(shí)現(xiàn)平滑升級(jí)的設(shè)計(jì)技 源碼分析RocketMQDLedger多副本即主從切換實(shí)現(xiàn)原 2.1RocketMQ2.1RocketMQDLedger多副本即主從切換專(zhuān)欄回顧(源碼閱讀技巧篇)<>>2.1RocketMQDLedgerRocketMQDLedger多副本即主RocketMQDLedger92首先在下決心研讀RocketMQDLedger多副本(主從切換)的源碼之前,首先還是費(fèi)端可以繼續(xù)從從節(jié)點(diǎn)上消費(fèi)消息,但無(wú)法繼續(xù)向該復(fù)制組發(fā)送消息。RocketMQ4.5.0版DLedger在該復(fù)制組內(nèi)觸發(fā)重新選主,選主完成后即可繼續(xù)提供消息寫(xiě)功能。同時(shí)還了解到rocketmq主從切換是基于raft協(xié)議的。raft過(guò)其大體作用但并未詳細(xì)學(xué)習(xí)的應(yīng)該也不在少數(shù),故我覺(jué)得看RocketMQDLedger多副本即主從切換之前應(yīng)該重點(diǎn)了解raft協(xié)議。一、RocketMQ多副本前置篇:初探raftraft基本實(shí)現(xiàn)后,然后就可以步入到RocketMQDLedger多副本即主從切換的源碼研究了,raftraftraftRocketMQDLedgerLeader本文按照上一篇的思路,重點(diǎn)對(duì)DLedgerLeaderElector的實(shí)現(xiàn)進(jìn)行了詳細(xì)分析,DEBUG一下,可以起到撥云見(jiàn)霧之效。RocketMQDLedgerDLedgerraft復(fù)制。因?yàn)槿罩緩?fù)制將涉及到存儲(chǔ),故在學(xué)習(xí)日志復(fù)制之前,先來(lái)看一下DLedger與存儲(chǔ)相關(guān)的設(shè)計(jì),例如DLedger日志條目的存儲(chǔ)協(xié)議、日志在服務(wù)器的組織等關(guān)系,這部分類(lèi)比RocketMQcmmitlg等的存儲(chǔ)。四、源碼分析RocketMQDLedger(在學(xué)習(xí)完DLedger五、源碼分析RocketMQDLedger(raft協(xié)議的RocketMQDLedger多副本日志復(fù)制設(shè)計(jì)志的實(shí)現(xiàn)要點(diǎn)做一個(gè)總結(jié),以此來(lái)介紹rocketmqDledger多副本即主從切換部分的raft協(xié)議的解讀。七、RocketMQ整合DLedger(多副本)即主從切換實(shí)現(xiàn)平滑升級(jí)6raft協(xié)議的選主與日志復(fù)制。從本節(jié)開(kāi)始將介紹rocketmqrocketmq4.5.0版本才引入的,如果從老版本升級(jí)到4.5.0,直接RocketMQDLedger九、RocketMQDLedger經(jīng)過(guò)前面8LderLegerdebug官方提供的單元測(cè)試用例。溫馨提示:本專(zhuān)欄是《RocketMQ《RocketMQ2.2源碼分析2.2源碼分析RocketMQACL >2.2源碼分析RocketMQRocketMQRocketMQACLRocketMQACLRocketMQACL備注:RocketMQ在4.4.0時(shí)引入了ACL機(jī)制,本文代碼基于RocketMQ4.5.0RocketMQACLBrokerACL機(jī)BrokerACLif(!this.brokerConfig.isAclEnable()){("Thebrokerdosenotenableacl");//List<AccessValidator>accessValidators=L_VALIDATOR_ID, if(accessValidators==null||acceVaao.mp)){("ThebrokerdosenotloadtheAccessValidator");for(AccessValidatoraccessValidator:accessValidators)finalAccessValidatorvalidator=this.registerServerRPCHook(newRPCHook()publicvoiddoBeforeRequest(StringremoteAddr,motnComaest)//Donotcatchthe//publicvoiddoAfterResponse(StringremoteAddr,mtngomnrequest,Remngomandresponse){4代碼@1:BrokeraclaclEnable代碼@2:使用類(lèi)似SPI機(jī)制,加載配置的AccessValidator,該方法返回一個(gè)列表,其實(shí)現(xiàn)邏輯時(shí)讀取META-NFeeo.aaheotqaessaao文代碼3(AccssValiatr),并向BrkerPCHok的BefreReuetRCHok的Afteresose代碼@4:RPCHook#doBeforeRequestAccessValidator#validateACLAclException。BrokerPlainAccessValidatorAccsRsurceare(emotigCmmadruet,trigrmotAdr)voidvalidate(AccessResourceRocketMQymlPlainAccessValidator的parse方法與validate細(xì)節(jié)。在講解該方法之前,我們首先認(rèn)識(shí)一下RocketMQ封裝訪問(wèn)資源的PlainAccessPlainAccessResourceprivateStringaccessKeyKey,用戶(hù)名。privateStringsecretKeyprivateStringhitRemteAdrssIPprivatebooleandminprivatebytefaltTpicerm=DENY。privatebytefaltGruPerm=1privatea<trin,Byte>resourcePermMapprivateRemoteAddressStrategyremoteAddressStrategy遠(yuǎn)IPprivateintrequestCoderequestCode。privatebytecontentsecretKey生成簽名字符串,服務(wù)端重復(fù)這個(gè)步驟,然后對(duì)比簽名字符串,如果相同,則privateStringsecretTokentoken。publicpublicPlainAccessValidator()aclPlugEngine=newaPesoLodacl規(guī)則的加載,即解析linaclyml接下來(lái)會(huì)重點(diǎn)探討,即acl啟動(dòng)流程之配置文件的解parse該方法的作用就是從請(qǐng)求命令中解析出本次訪問(wèn)所需要的訪問(wèn)權(quán)限,最終構(gòu)建AccessResource對(duì)象,為后續(xù)的校驗(yàn)權(quán)限做準(zhǔn)備。PlainAccessResourceaccessResource=PlainAccessResourceaccessResource=newPlainAccessResource();}elseStep1:首先創(chuàng)建PlainAccessResource,從遠(yuǎn)程地址中提取出遠(yuǎn)程訪問(wèn)IP地址。ifif(request.getExtFields()==null)thrownewAclException("request'sextFieldsvalueistry

switch(request.getCode())caseRequestCode.SEND_MESSAGE:topic"),caseRequestCode.SEND_MESSAGE_V2:b"),caseRequestCode.CONSUMER_SEND_MSG_BACK:originTopic"),ExtFields().get("group")),emon.UB);caseRequestCode.PULL_MESSAGE:topic"),caseRequestCode.QUERY_MESSAGE:accessResource.addResourceAndPerm(request.getExtFields().get("topic"),PermissicaseHeartbeatDataheartbeatData=HeartbeatData.decode(request.getBody(),HeartbeatData.class);for(ConsumerDatadata:heartbeatData.getConsumerDataSet()){GroupName()),for(SubscriptionDatasubscriptionData:

er

casefinalUnregisterClientRequestHeaderunregisterClientRequestHead(UnregisterClientRequestHeader)requestentRequestHeader.getConsumerGroup()),emon.UB);casefinaleCosueLstyGropRqustedtCosuristGroupRequestHeader=(GetConsumerListByGroupReqestHeader)eLBoupReqeeade.eonumeou)),emon.UB);casefinalUpdateConsumerOffsetRequestHeadertRequestHeader(UpdateConsumerOffsetRequestHeader)moumerOffsetRequestHeader.getConsumerGroup()),uestHeader.getTopic(),emon.UB);}catch(Throwablet)thrownewAclException(t.etMessage(),KeyACL權(quán)限驗(yàn)//oeda<ng,String>map=neweeap<ng,forapn<ng,String>entry:request.getExtFields().entrySet()){if!eonedena.INAUR.euaen.geKe))){map.put(entry.getKey(),acceReouceeonenAU.cobneReueoneneue,map));returnaccessResource;contentpublicvoidvalidate(AccessResourceaccessResource){aclPlugEngine.validate((PlainAccessResource)accessResource);publicvoidvalidate(AccessResourceaccessResource){aclPlugEngine.validate((PlainAccessResource)accessResource);AclException。為了揭開(kāi)配置文件的解析與驗(yàn)證,我們將目光投入到laiPrmisiLaer默認(rèn)aclcf/lai_cl.ymlacl配置文件名稱(chēng),默認(rèn)為DEFAULT_PLAIN_ACL_FILE,可以通過(guò)系統(tǒng)參數(shù)-rcketmqacllinfil=filame指定。a<Strig,laiAccssesurc>linAccsReorceMpemotAdrsstratgyFctryrmotAdrestrateyFctoryPP地址。booleanpubliclinermisinLaer()publicvoidvalidate(PlainAccessResourceplainAccessResource)anenLaepublicpublicanemsonLoae){loadwatchMMap<String,PlainAccessResource>panAceReouceap=newHashMap<>();List<RemoteAddressStrategy>oblhtRotAddrssSrt=newArrayList<>Stringpath=feome+File.separator+Step1:初始化AcorMp(用戶(hù)配置的訪問(wèn)資源,即權(quán)限容器)、lobalWhiteRmoteAddressStrateyIP${RJSONArrayJSONArrayobWeoddsss=plainAclConfData.getJSONArray("globaif(glbalhiteReoteAdreeLi!=null&&gloalhiteRemoteddreeLit.impty()){for(inti=0;i<gobaheRmoeddeLs.e);i++){teploalWhiteRmteAdreses則,使用remoteAddressStrategyFactory獲取一個(gè)訪問(wèn)策略,下文會(huì)重點(diǎn)介紹其配置規(guī)則。JSONArrayJSONArrayaccounts=panAcnfDaageJONaaccoun);if(accounts!=null&&!accun.mp)){List<PlainAccessConfig>plainAccessConfigList=accounts.toJavaList(PlainAccessfor(PlainAccessConfigplainAccessConfig:plainAccessConfigList){PlainAccessResourceplainAccessResource=buildPlainAccessResource(plainh.gobheRemoedeaegy=gobaheRmoeddSaeg;h.panAceReouceap=panAceReouceap;Step3a_amaccounts,用戶(hù)定義的權(quán)限信PlainAccessConfigaccountsdop上述標(biāo)簽的說(shuō)明,請(qǐng)參考:《RocketMQACL使用指南》。具體的解析過(guò)程比較容privateprivatevoidwatch(){try{StringwatchFilePath=feome+laSrilWhei=newlaSrinString[]{watchFilePah},newFeachevceLene){publicvoidonChanged(Stringpath)("Theplainaclmlchanged,reloadthecontext");("SucceedtostartAcacheevce);h.achat=true;}catch(Exceptione)500msprivateStringhash(StringfilePath)throwsIOcepon,Nouchgohmcepon{Pathpath=Paths.get(filePath);byte[]hash=md.digest();returnmd5簽名來(lái)做對(duì)比,這里為什么不在啟動(dòng)時(shí)先記錄上一次文件的修改時(shí)間,////Checktheglobalwhiteremotefor(RemoteAddressStrategyremoteAddressStrategy:oblheoeddsSttegy){ifemoeddeaeg.mahpanAceReouce)){ifif(plainAccessResource.getAccessKey()==null)thrownewAclException(String.format("NoaccessKeyisif!panAceRanKepanAceReouce.gecceKe))){thrownewlExpio(Srig.ort("aclconfigfor%s",plainAccessResourcStep2AccessKey異常;如果BrokerAclException。////CheckthewhiteaddrforPlainAccessResourceownedAccess=pancsssoucMp.gtpliAesRsifonedAce.geReoeAdeaeg).macpanAceReouce)){Step3////ChecktheStringsignature=AclUtils.calSignature(plainAccessResource.getContent(),ownedAccif(!signature.equals(plainAccessResource.getSignature()))thrownewAclException(String.format("ChecksignaturefailedforaccessKey=%s"Step4ccheckPerm(plainAccssResource,Step5checkPermififPrisson.eddmnPrnedhkdAes.gtqustode(&&!ownedAcce.Admn)){thrownewAclException(String.format("Needadmipmssoforrequest=%d,butaccessKey=%sisnot",needCheckedAccess.getRequestCode(),ownedAccessStep6Amin用戶(hù)才能訪問(wèn)的權(quán)限,并且當(dāng)前用戶(hù)并不是管理員角,則拋出異常,如下命令需要dmin角才能進(jìn)行的操作:MaMap<String,Byte>needCheckedPermMap=needCheckedAccess.getResourcePermapng,Byte>ownedPermMap=ownedAccess.getResourcePermMap();if(needCheckedPermMap==null){//IftheneedCheckedPermMapisnull,thenreturnif(ownedPermMap==null&&onedcce.Admn))//IftheownedPermMapisnullanditisanadnuser,thenreturnforforMp.Ety<tigByte>needCheckedEntry:ByteneededPerm=needCheckedEntry.getValue();booleanbooleanisGroup=if(ownedPermMap==null||!oneemapconanKeeoue))//CheckthedefaultbyteownedPerm=isGroup?odcs.eatropP(:ownedAif(!Permission.checkPermission(neededPerm,ownedPerm))thrownewAclException(String.format("Nodefaultprissofor%s",PlanAccessResource.printStr(resource,isGroup)));if!emon.checemonneededem,ownedPermMap.get(resource))){thrownewlExeptio(Stin.fora("defaultprissiofor%s",PlainAccessResource.printStr(resource,AclException。BrokeraclACL需要處理的事情。ACL其在創(chuàng)建DefaultMQProducer時(shí),注冊(cè)AclClientRPCHook鉤子,會(huì)在向服務(wù)端AclClitRPCHk。1.publicpublicvoiddoBeforeRequest(StringremoteAddr,Remngomandrequest){byte[]total=AcU.cmbneequoneneque,parseRequestContent(request,sessionCredentials.getAccessKey(),session //@1qet.ddExid(SGTEsignature);//@3request.addExtField(ACCESS_KEY,sessionCredentials.getAccessKey());//TheSecurityTokenvalueisunneccessary,usercanchoosethisone.if(sessionCredentials.getSecurityToken()!=null){request.addExtField(SECURITY_TOKEN,代碼@1:將Request請(qǐng)求參數(shù)進(jìn)行排序,并加入accessKey代碼@2:對(duì)排好序的請(qǐng)參數(shù),使用用戶(hù)配置的密碼生成簽名,并最近到擴(kuò)展字段SignatureSignature,如果相同,則表示簽名驗(yàn)證代碼@3:將Signature、AccessKeyACLACLRocketMQ消息軌跡的使用與實(shí) >2.3源碼分析RocketMQ2.3源碼分析2.3源碼分析RocketMQ< RocketMQ本文沿著《RocketMQ消息軌跡-設(shè)計(jì)篇》的思路,從如下3個(gè)方面對(duì)其源碼進(jìn)行解publicpublicclassTraceProducerpublicstaticvoidnSrn[args)throwsCinExptonionealtMrocproducer=newealtMrocr(rucrrua //@1try{Messagemsg=newMessage("TopicTest","Hellood.eBeRemongee.FAULAR));SendResultsendResult=producer.send(msg);em.ou.nf%n,sendResult);}catch(Exceptione){從上述代碼可以看出其關(guān)鍵點(diǎn)是在創(chuàng)建DefauMQProuce時(shí)指定開(kāi)啟消息軌跡跟蹤efultQPrucrpublicpublicDefauodcefnalStringproducerGroup,booleanenaegace)publicuMPodc(nStringproducerGroup,booleanblsTefinalStringproducerGroupbooleanoDeaulMQPruerpublicpublictQrod(nStringproducerGroup,RPCHookrpcHook,booleanenabegacefnalStringcuomedacepc){ //@1ducerGroup=defaultMQProducerImpl=newDefaultMQProducerImpl(this,//ifclientopenthemessagetracefeatureifenabegTace){//tryAsyncTraceDispatcherdispatcher=newyTaipae(soidTraceTopic,rpcHook);traceDispatcher=dispatcher;new////}catch(Throwablee)og.eror(systmqtracehookinitfailed,maybecan'tsendmsgtrace代碼@1:StringproducerGroupRPCHookrpcHookolanaleMgTrceStringcstmizeTracToic代碼@2:用來(lái)構(gòu)建AsyncTraceDispatcher,看其名:異步轉(zhuǎn)發(fā)消息軌跡數(shù)據(jù),稍代碼@3drcooImAsyncTraceDispatcherSenMeageTraeHIml鉤子函數(shù)droomp類(lèi)圖消息軌跡轉(zhuǎn)發(fā)處理器,其默認(rèn)實(shí)現(xiàn)類(lèi)AsyncTraceDispatcherint異步轉(zhuǎn)發(fā),隊(duì)列長(zhǎng)度,默認(rèn)為2048intint128KefuQPuetraceProducerhredolxcutrtrcexcuterAtmicLngicarCuntThreadwokerArryBlckinQuee<TrceCntet>trcCotextQeueracCotetArryBlckinQuee<unnbl>pedrQuue124。efultQPuhCnsmrmlstCnsmromdcoomppublicpublicvoidsendMessageBefore(SendMessageContextcontext)//ifitismessagetracedata,thenitdoesn'tif(context==null||oxt.Msa(.topc.sasWh(snrDsptcher)ocDpache)geaceocNae))){ //@1//buildthecontextcontentofTuxeTraceContextTraceContexttuxeContext=newTraceContext();////buildthedatabeanobjectofmessagetraceTraceBeantraceBean=newTraceBean();//@3代碼@1:topicTopicTraceBean(TraceType.Pub)與生產(chǎn)者所屬的組。代碼@3TraceBeantopic、tags、keysbrokerenMesaeBfrepublicpublicvoidsendMessageAfter(SendMessageContextcontext)//ifitismessagetracedata,thenitdoesn'tif(context==null||if(context==null||oxt.Msa(.topc.sasWh(snrDsptcher)ocDpache)geaceocNae)) //@1||context.getMqTraceContext()==null){if(context.getSendResult()==null){if(context.getSendResult().getRegionId()==||!cne.geendeu).aceOn))TraceBeantraceBean=tuxeContext.getTraceBeans().get(0);//mm //@3//if(context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK))}elseaceBeaneoemeueone.gemeam)+come/2);//代碼@1:topicTopic代碼@3:代碼@4:設(shè)置costTime(耗時(shí))、success(是否發(fā)送成功)、regionId(發(fā)送到broker所在的分區(qū))、msgId(消息ID,全局唯一)、offsetMsgId(消息物理偏移量,如果eTe間+二分之一的耗時(shí))來(lái)表示消息的存儲(chǔ)時(shí)間,這里是一個(gè)估值。代碼@5:將需要跟蹤的信息通過(guò)TraceDispatcher轉(zhuǎn)發(fā)到Broker服務(wù)器。其代碼publicpublicbooleanappend(finalObjectctx)booleanresult=traceContextQueue.offer((TraceContext)ctx);if(!result){("bufferfull"+dcadun.ncmenAnet)+",contextis"+returnoffer接下來(lái)將目光轉(zhuǎn)向TraceDispatcherTraceDispatcherTraceDispatcher,用于客戶(hù)端消息軌跡數(shù)據(jù)轉(zhuǎn)發(fā)到Broker,其默認(rèn)實(shí)現(xiàn)類(lèi):TraceDispatcherpublicpublicAsyncTraceDispatcher(StringrcTopiaeRPCHookrpcHook)throwsMQClientException{////queueSizeisgreaterthanorequaltothenpowerof2ofvaluethis.queueSize=2048;this.batchSize=this.maxMsgSize=128000;this.traceContextQueue=newAaBocngueueaceone1024);h.apendeuee=newAaBocknueue<unnaequeuee);}elsethis.traceTopicName= //this.traceExecuter=newThreadPoolExecutor(//:10,//20,1000*60,//meUn.ILIOND,//h.apendeuee,//newThreadFactoryImpl("MQTraceSendThread_"));traceProducer=getAndCreateTraceProducer(rpcHook); 代碼@1:隊(duì)列長(zhǎng)度,默認(rèn)為2048BrokerBroker128kTPS發(fā)送過(guò)traceContextBrokerom用于接收消息軌跡的Topic,默認(rèn)為RMQ_SYS_TRANS_HALF_TOPICBroker1020,隊(duì)列堆積長(zhǎng)度2048,線程名稱(chēng):MQTraceSendThread_。、發(fā)送消息軌跡的Producer代碼@2:調(diào)用getAndCreateTraceProducer方法創(chuàng)建用于發(fā)送消息軌跡的getAndCreateTraceProducer詳解privateprivateDefaoducergetAndCreateTraceProducer(RPCHookrpcHook){DefauodcertraceProducerInstance=this.traceProducer;if(traceProducerInstance==null){//@1traceProducerInstance=newDefauoucepoo);//Themaxsizeofmessageis128KaceodceInanceeaeageiemagSe-10*1000);return代碼@1:如果還未建立發(fā)送者,則創(chuàng)建用于發(fā)送消息軌跡的消息發(fā)送者,其GroupName為:_INNER_TRACE_PRODUCER,消息發(fā)送超時(shí)時(shí)間5s,最大允許118K。publicpublicvoidstart(StringnameSrvAddr)throwsencepon{ifaed.copaeAdefae,true)){ //@1traceProducer.setInstanceName(TRACE_INSTANCE_NAME+"_"+nameSrvthis.worker=newThread(newAsyncRunnable(),"MQ-AsyncTraceDispatcher-Thread-"+dispatcherId); //@2開(kāi)始啟動(dòng),其調(diào)用的時(shí)機(jī)為啟動(dòng)efaultMQProducer時(shí),如果啟用跟蹤消息軌跡,代碼@1:如果用于發(fā)送消息軌跡的發(fā)送者沒(méi)有啟動(dòng),則設(shè)置nameserver地址,并代碼@2:?jiǎn)?dòng)一個(gè)線程,用于執(zhí)行AsyncRunnableclassAsyncRunnableclassAsyncRunnablempemesRunnable{privatebooleanstopped;publicvoidrun()while(!stopped)List<TraceContext>contexts=new////for(inti=0;i<batchSize;i++){TraceContextcontext=null;try{//gettracedataefromblockingQueue—context=traceContextQueue.poll(5,UN//}catch(InterruptedExceptione)if(context!=null){}elseAsyncAppenderRequestrequest=new}elseif(AsyncTraceDispatcher.this.stopped){this.stopped=true;代碼@1:構(gòu)建待提交消息跟蹤Bean,每次最多發(fā)送batchSize,默認(rèn)為100條。代碼@2:traceContextQueueTraceContext,設(shè)置超時(shí)時(shí)5sTraceContext5s。代碼@3AsyncAppenderRequest。publicpublicvoidsendTraceData(List<TraceContext>contextList)Map<String,List<TraceTransferBean>>transBeanMap=newMp<rLfor(TraceContextcontext:contextList){ ifcone.geacBean).mp)){//TopicvaluecorrespondingtooriginalmessageentitycontentStringtopic=context.getTraceBeans().get(0).getTopic(); //@2//Useoriginalmessageentity'stopicaskeyStringkey=topic;List<TraceTransferBean>transBeanList=transBeanMap.get(key);if(transBeanList==null){transBeanMap.put(key,transBeanList);TraceTransferBeantraceData=TraceDataEncoder.encoderFromContextBean //@3forapn<ng,List<TraceTransferBean>>entry: //代碼@2Topic。代碼@3:TraceContext代碼@4:將編碼后的數(shù)據(jù)發(fā)送到Broker服務(wù)器。casecasePub:TraceBeanbean=//appendthecontentofcontextandtraceBeantotransferBean'sTransDatajson2)SubBeforefor(TraceBeanbean:ctx.getTraceBeans()){for(TraceBeanbean:ctx.getTraceBeans()){}12casecaseSubAfter:for(TraceBeanbean:ctx.getTraceBeans()){其實(shí)現(xiàn)原理其實(shí)是一樣的,就是在消息消費(fèi)前后執(zhí)行特定的鉤子函數(shù),其實(shí)現(xiàn)類(lèi)為CnsmeesageTrceHokml紹了。ocktQBrker那消息軌跡的主題名如何指定?其路由信息又怎么分配才好呢?是每臺(tái)Broker上都創(chuàng)建ocktMQRocketMQ默認(rèn)的消息軌跡主題為:RMQ_SYS_TRACE_TOPIC,那該Topic需要if(this.brokerController.getBrokerConfig().isTraceTopicEnable()) //Stringtopic=TopicConfigtopicConfig=newTopicConfig(topic);//h.opcnfgabepupcnfg.gopcame),topicConfig);上述代碼出自opiconiMng的構(gòu)造函數(shù),在Broker啟動(dòng)的時(shí)候會(huì)創(chuàng)建ticCnfiMaagrtopic代碼@1:如果Broker開(kāi)啟了消息軌跡跟蹤(traceTopicEnable=true)時(shí),會(huì)自動(dòng)創(chuàng)建topic1。publicpublictQrod(nStringproducerGroup,RPCHookrpcHook,booleanenabegacefnalStringcuomedacepc)publicDefauuhonumefnalStringconsumerGroup,RPCHookllatMegQueeraegllatMegQueeratgbooleanenabegace,finalStringcuomedacepc)通過(guò)oeTeTp來(lái)指定消息軌跡TopicRocketMQTopic。RocktMQ消息軌跡的實(shí)現(xiàn)原理,下一2.42.4RocketMQraft >2.4RocketMQraftRocketMQraftRaftLeader一、LeaderRaft協(xié)議中節(jié)點(diǎn)有3種狀態(tài)(角領(lǐng)導(dǎo)者(Leader置為150ms~300ms之間的隨機(jī)值。當(dāng)定時(shí)器到期后,節(jié)點(diǎn)狀態(tài)從Follower變成Candi通常情況下,三個(gè)節(jié)點(diǎn)中會(huì)有一個(gè)節(jié)點(diǎn)定時(shí)器率先到期,節(jié)點(diǎn)狀態(tài)變?yōu)镃andidate,Candidate當(dāng)節(jié)點(diǎn)狀態(tài)為Candidate,將發(fā)起一輪投票,由于是第一輪投票,設(shè)置本輪投票輪次為1,并首先為自己投上一票,正如上圖所示的NodeA節(jié)點(diǎn),Team為1,VoteCount1.ALeader,然后定時(shí)向集群內(nèi)的NodeA,集群中的LeaderLeaderFlower狀態(tài)的節(jié)點(diǎn)在記時(shí)時(shí)間超時(shí)內(nèi)沒(méi)有收到Leader的心跳包,就會(huì)從Flower節(jié)點(diǎn)變成NodeAB的CCandidate,則向集群內(nèi)的節(jié)點(diǎn)發(fā)起投票,如下圖所示。B2,然后首先為自己投上一篇,然后向其他節(jié)點(diǎn)發(fā)起投CBLeader3但也有可能同一時(shí)間,或一個(gè)節(jié)點(diǎn)在未收到另一個(gè)節(jié)點(diǎn)發(fā)起的投票請(qǐng)求之前變成Caniate1個(gè)的節(jié)點(diǎn)狀態(tài)都是Caniate,那該如何選主呢?Candidate4,首先節(jié)點(diǎn)C、D在收到D、C節(jié)點(diǎn)的投票請(qǐng)求時(shí),都會(huì)返回不同意,因?yàn)樵诒据喭镀敝蠥CBDCDA,BCD圖顯示,C、D2此時(shí)A,B,C,D的定時(shí)器各自在倒計(jì)時(shí),當(dāng)節(jié)點(diǎn)成為Candidate時(shí),或自身狀態(tài)本身ACBleaderBD56,如圖所示:RaftRaftlger(cketMQ多副本)模塊提供一些思路。Raft3Follower(跟隨者)、Candidate(候選者),投票的觸發(fā)點(diǎn),F(xiàn)ollower、Candidate150ms-300msFollowerCandidate狀態(tài)的節(jié)點(diǎn),每發(fā)起一輪投票,Team加一。A3,并且已經(jīng)B3,則會(huì)投反對(duì)票,如果收到輪次4Leader32343RaftLeaderset55LeaderACK。Leader如果Leader節(jié)點(diǎn)向從節(jié)點(diǎn)廣播日志時(shí),其中某個(gè)從節(jié)點(diǎn)發(fā)送故障宕機(jī),該如何處理日志在什么環(huán)節(jié)進(jìn)行提交呢?Leader進(jìn)入到RocketMQ多副本的學(xué)習(xí)中,通過(guò)源碼分析RocketMQDLedger的實(shí)現(xiàn)后,再raft >2.5RocketMQLeader2.5源碼分析2.5源碼分析RocketMQLeader< RocketMQLeader選主本文關(guān)鍵字:RocketMQ、多副本、DLedger、leader本文將按照《RocketMQraftRocketMQLeader的一些思考:3Follower(跟隨者)、Candidate(候選者),投票的觸發(fā)點(diǎn),F(xiàn)ollower、Candidate150ms-300msFollowerCandidate狀態(tài)的節(jié)點(diǎn),每發(fā)起一輪投票,Team加一。A3,并且已經(jīng)B3,則會(huì)投反對(duì)票,如果收到輪次4Leader32343一、DLedgerraftDLedgerompltblFturGettriRpoet(GttriqusrequesomlbFtAppdrpoappend(AppendEntryRequestrequest)ompltblFtMtdtRponmetadata(MetadataRequestreqDLedgerCmpltaleFtureVotRepne>vt(VteRqestruet)meabeuue<eaeaResonsheartBeat(HeartBeatRequestreLeaderomplblFtrlltrspopull(PullEntriesRequestrequesompltbFtrstryponpush(PushEntryRequestrequeHandlerDLedgerClientProtocolHandler、DLedgerProtocolHanderDLedgerServerNettyDLedLeader選舉實(shí)現(xiàn)器。DledgerServer,Dledger接下來(lái)將從DLedgerLeaderElector開(kāi)始剖析DLedger是如何實(shí)現(xiàn)Leader(raft)LeaderDLedgerLeaderElector類(lèi)圖RandomDLedgerConfigdLedgerConfigMemberStatememberStateDLedgerRpcServicerpcRPClglsteaerHartBetimelglstenHertBatTimelglstuccHartBeatimeinteartBatT允許最大的N個(gè)心跳周期內(nèi)未收到心跳包,狀態(tài)為Follower的節(jié)點(diǎn)只有超過(guò)mxHeartBeatLek*eartBeatTimentervalsCniatelgetTimeTReustVteolaneIcreaeTrmImmeiatelyintoI最小的發(fā)送投票間隔時(shí)間,默認(rèn)為300msint最大的發(fā)送投票的間隔,默認(rèn)為1000msList<RoleChangeHandler>注冊(cè)的節(jié)點(diǎn)狀態(tài)處理器,通過(guò)addRoleChangeHandler方法添加。longlastVoteCosttateaitainrtateaitainr通過(guò)DLedgerLeaderElector的startuppublicvoidstartup(){ //for(RoleChangeHandlerroleChangeHandler:roleChangeHandlers){ //@2代碼@1代碼@2:遍歷狀態(tài)改變監(jiān)聽(tīng)器并啟動(dòng)它,可通過(guò)DLedgerLeaderElector的addRoleChangeHandler方法增加狀態(tài)變化監(jiān)聽(tīng)器。publicpublicvoidrun()while(running.get()){try{}catch(Throwablet){if(logger!=null){logger.error("UnexpectedErrorinrunning{}",getName(),從上面來(lái)看,主要是循環(huán)調(diào)用doWork方法,接下來(lái)重點(diǎn)看其doWork的實(shí)現(xiàn):publicvoiddoWork(){try{publicvoiddoWork(){try{if(DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()){ //@1//////}catch(Throwablet){DLedgerLeaderElector.logger.error("Errorinheartbeat",t);代碼@4:沒(méi)執(zhí)行一次選主,休息10msprivatevoidmantantate)throwsException{ifmembtate.Leade)){}elseifmembtate.Fooe)){}elsecandidate我們?cè)诶^續(xù)往下看之前,需要知道m(xù)emberState的初始值是什么?我們追溯到創(chuàng)建raftfollowerDLedger的實(shí)現(xiàn)從candidate開(kāi)始,一開(kāi)始,集群內(nèi)的所有節(jié)點(diǎn)都會(huì)嘗試發(fā)起投票,這樣第一輪要達(dá)成選舉antainSat方法。下面重點(diǎn)來(lái)分析其狀態(tài)的mantansCandatifSyst.rrnTieills(<xtTmToRqustVot&&!needIncreaseTermImmediately){longlongedgenem;longStep1mLeaderLeadersynchronized(memberState){if!memeae.andae)){if(lastParseResult==Vospos.Prest.ATT_VE_X||needInceaeemImedae){longprevTerm=memberState.currTerm();term=memberState.nextTerm();oge.no("{_[ICESE_TEMfrom{}to{}",mbrSat.gtSlId(prevTerm,term);lastParseResult=}elseterm=ledgerEndIndex=memberState.getLedgerEndIndex();edgendem=Step2:初始化team、ledgerEndIndex、ledgerEndTerm屬性,其實(shí)現(xiàn)關(guān)鍵點(diǎn)如下:如果上一次的投票結(jié)果不是WAIT_TO_VOTE_NEXT(ifneedIneaeemImmeae){nemeoRequeoe=geNemeoReueVe);needIneaeemImmdaey=false;tep3:如果eeIcreaseermmmeiately為treflse,privateprivatelonggetNextTimeToRequestVote()returnSystem.currentTimeMillis()+lastVoteCost+Voeta+random.neInmaVoeInevs-mnoeInea);300ms(1000-300)之間的隨機(jī)值。finalfinals<Copeblte<VoeponequorumVoteResponses=voteForQuoumResponses(term,edgendem,ledgerEndIndex);finalfinalAomcongknownMaxTermInGroup=newAocLon-1);finalAtomicIntegeraNum=newAtomicInteger(0);finalAomcnegervadNum=newAtomicInteger(0);finalAomcnegeracceptedNum=newAtomicInteger(0);finalAomcnegerbggeLdgeum=newAtomicInteger(0);finalAomcooenalreadyHasLeader=newStep5kwnaTermnGrupcceteNum準(zhǔn)備好,對(duì)端節(jié)點(diǎn)使用本次的輪次進(jìn)入Candidate狀態(tài)。發(fā)起投票的節(jié)點(diǎn)的lgrEdTrm小于對(duì)端節(jié)點(diǎn)的個(gè)數(shù)。是否已經(jīng)存在Leaderforfor(CompletableFture<VoteRsponse>future:quorumVoteResponses)Step5if(x.getVoteResult()!=VoteResponse.RESULT.UNKNOWN){Step6:如果投票結(jié)果不是UNKNOW,則有效投票數(shù)量增1synchronizedsynchronized(knownMaxTermInGroup){switch(x.getVoteResult()){caseACCEPT:casecaseaeadHaLead.cmpeAndefae,true);caseREJECT_TERM_SMALL_THAN_LEDGER:caseREJECT_EXPIRED_VOTE_TERM:if(x.getTerm()>knownMaxTermInGroup.get())caseREJECT_EXPIRED_LEDGER_TERM:caseREJECT_SMALL_LEDGER_END_INDEX:caseStep7贊成票,acceptedNum加一,只有得到的贊成票超過(guò)集群節(jié)點(diǎn)數(shù)量的一半才能成為L(zhǎng)eaeralreadyHasLeadertrue,無(wú)拒絕票,如果自己維護(hù)的term小于遠(yuǎn)端維護(hù)的lderEdTrm則返回該結(jié)果,如teamteam,需要記錄對(duì)端最大的投票輪次,以便更新自己的投票輪termterm拒絕票,如果自己維護(hù)的deTeledgerTerm則返回該結(jié)果。igeregerum的值。lderTemlderTem相等,但是自己維護(hù)的dedgerEndIndex小于對(duì)端維護(hù)的值,返回該值,增加igeregerum計(jì)數(shù)器的值。team,則認(rèn)為對(duì)端還未準(zhǔn)備好投票,對(duì)端使用自CandidatetrytryvoteLatch.await(3000+random.nextInt(maxVoteIntervalMs),}catch(Throwableignore)Step8lastVoteCostlastVoteCost=DLegeU.eaedaVoeme);VoteResponse.ParseResultparseResult;if(knownMaxTermInGroup.get()>term)parseResult=VoRepoe.aeReu.AIOVONT;nemeoRequeoe=geNemeoReueVe);}elseif(alreadyHasLeader.get())parseResult=VoRepoe.aeReu.AIOVONT;etieRqett=tNtimoeutot(+heartBeatTimeIntervaMs*}elseif!memeae.uoumvdNumge)))parseResult=VoRepoe.aeReu.AIORVO;nemeoRequeoe=getNextTimeToRequestVote();}elseifmembae.uoumacepeNumge))){parseResult=VoteResponse.ParseResult.PASSED;}elseifmembae.uoumacepeNumge)+notReadyTermNum.get())){parseResult=VoRepoe.aeReu.RVOIDIALY;}elseif(membrState.isQuorum(acceptedNum.get()+biggerLedgerNm.gt()))parseResultparseResult=VoRepoe.aeReu.AIORVO;nemeoRequeoe=getNextTimeToRequestVote();}elseparseResult=VoRepoe.aeReu.AIOVONT;nemeoRequeoe=getNextTimeToRequestVote();Step9:根據(jù)收集的投票結(jié)果判斷是否能成為L(zhǎng)eader溫馨提示:在講解關(guān)鍵點(diǎn)之前,我們先定義先將(當(dāng)前時(shí)間戳+上次投票的開(kāi)銷(xiāo)+最小投票間隔(300ms(1000300)之間的隨機(jī)值)1Candidate1如果已經(jīng)存在eader,該節(jié)點(diǎn)重新進(jìn)入到Candiate,并重置定時(shí)器,該定時(shí)器的時(shí)間:1+eartBeatTimentervalMs*maHartBatLeak,ertBeatTimetervalsmaxHeartBeatLeak為允許最大丟失的心跳包,即如果FlowerLeader如果收到的有效票數(shù)未超過(guò)半數(shù),則重置計(jì)時(shí)器為“1個(gè)常規(guī)計(jì)時(shí)器”,然后等待重新投票,注意狀態(tài)為WAIT_TO_RVOTE如果得到的贊同票超過(guò)半數(shù),則成為L(zhǎng)eader為REVOTE_IMMEDIATELY。ledgerEndIndexifif(parseResult==VoteResponse.ParseResult.PASSED)("[{}][VOTE_RESULT]hasbeenelectedtobetheleaderinterm{}",memeae.geefId,term);Step10:如果投票成功,則狀態(tài)機(jī)狀態(tài)設(shè)置為L(zhǎng)eader,然后狀態(tài)管理在驅(qū)動(dòng)狀態(tài)時(shí)會(huì)調(diào)用LegrLedrElctr#maitintatemaitaiAseaer方法。mantansLeade經(jīng)過(guò)iiCddt投票選舉后,被其他節(jié)點(diǎn)選舉成為領(lǐng)導(dǎo)后,會(huì)執(zhí)行該方法,其他節(jié)點(diǎn)的狀態(tài)還是Candidate,并在計(jì)時(shí)器過(guò)期后,又嘗試去發(fā)起選舉。接下來(lái)重LeaderprivatevoidmananALeae)throwsExceptionifDedertls.eapsed(astSndHatBaTie>arBaTieItrals{longterm;synchronized(memberState)if!memeae.Lead)) ////stopsendingterm=memberState.currTerm();leaderId=memberState.getLeaderId();aendeaBeame= //sendHeartbeats(term, //代碼@1:首先判斷上一次發(fā)送心跳的時(shí)間與當(dāng)前時(shí)間的差值是否大于心跳包發(fā)送間代碼@2:leader代碼@4:mantansFlwe方法Candidatefollower,followerprivateprivatevoidmananAFoe)ifDLedgeUs.eapedasLeadeeaBeaTme)>2*vsynchronized(memberState)ifmebrtateiFlloer(&&(DLedgerUtils.elapsed(lastLeaderHeartBeame)>maxHeartBeatLeak*heaBeameIeva)){("[{}][HeartBeatTimeOut]stadrataTi{}heartBtTienevlMs{}lastLeader={}",mbrStt.eSefd)newiestplasLadeaBeame),heaBeameInea,memberState.getLeaderId());如果maxHeartBeatLeak(默認(rèn)為3)個(gè)心跳包周期內(nèi)未收到心跳,則將狀態(tài)變更為節(jié)點(diǎn)的狀態(tài)為Candidate時(shí)會(huì)向集群內(nèi)的其他節(jié)點(diǎn)發(fā)起投票請(qǐng)求(個(gè)人覺(jué)得理解為拉票更好),向?qū)Ψ皆?xún)問(wèn)是否愿意選舉我為L(zhǎng)eader,對(duì)端節(jié)點(diǎn)會(huì)根據(jù)自己的情況對(duì)其投贊成票、拒絕票,如果是拒絕票,還會(huì)給出拒絕原因,具體由voteForQuorumResponses、handleVote這兩個(gè)方法來(lái)實(shí)現(xiàn),接下來(lái)我們分別對(duì)這兩個(gè)方法進(jìn)行詳細(xì)分析。vteForQuorumRespnsesprivateprivateList<CompletableFuture<VoteResponse>>voteForQuorumResponses(longm,longlongledgerEndIndex)throwsException{ //@1L<omeabeuue<oeepone>responses=newArrayList<>();for(Stringid:memberState.getPeerMap().keySet())VoteRequestvoteRequest=newVoteRequest();omeabeFue<oeRpone>voteResponse;voteResponse=handleVote(voteRequest,}elsevoteResponse=return////@3//@3////代碼@1:longlonglong代碼@3代碼4anleVte發(fā)送給集群內(nèi)的其他節(jié)點(diǎn),則通過(guò)網(wǎng)絡(luò)發(fā)送投票請(qǐng)求,對(duì)端節(jié)點(diǎn)調(diào)用各自的handleVote接下來(lái)重點(diǎn)關(guān)注handleVote方法,重點(diǎn)探討其投票處理邏輯。handleVote方法handleVote方法會(huì)并發(fā)被調(diào)用,因?yàn)榭赡芡瑫r(shí)收到多個(gè)節(jié)點(diǎn)的投票請(qǐng)求,故本synchronizedmemberState對(duì)象。ifif(!memberState.isPeerMember(request.getLeaderId()))logger.warn("[BUG][HandleVote]remoteId={}isanunknownmember",request.gereturnopabt.optdrwVosposeqt.mif(!self&&memeae.geefId.euaeque.geLadeI))){oe.a("B[HandleVote]selfId={}butremoteId={}",bSt.teI(),returnoplblur.opltdteeVoteResponse(request).term(memStep1BUG存在,否則是不會(huì)出現(xiàn)上if(request.getTerm()<memberState.currTerm()){//returnoplblur.opltdteeVoteResponse(request).term(memeeee}elseif(request.getTerm()==memberState.currTerm()){//@2if(memberState.currVoteFor()==null){//letit}elseif(membrState.currVoteFor().quals(request.etLeadrId()))//repeatjustletit}elseif(memberState.getLeaderId()!=null)returnCompletableFpletedFuture(newVoteResponse(request).tr(eetaeurem).tRul(teenR.CLAYH}elsereturnompeablutr.copltdFtr(eVoteResponse(reques}else ////steppeddownbylargertermneedIneaeemImmdaey=true;//onlycanhandleVotewhenthetermisreturnoplblur.opltdteeVoteResponse(request).term(memStep2team如果發(fā)起投票節(jié)點(diǎn)的term小于當(dāng)前節(jié)點(diǎn)的如果發(fā)起投票節(jié)點(diǎn)的term等于當(dāng)前節(jié)點(diǎn)的如果兩者的term相等,說(shuō)明兩者都處在同一個(gè)投票輪次中,地位平等,接下來(lái)看該如果未投票、或已投票給請(qǐng)求節(jié)點(diǎn),則繼續(xù)后面的邏輯(step3)LeaderLeader如果發(fā)起投票節(jié)點(diǎn)的term大于當(dāng)前節(jié)點(diǎn)的termCandidateifif(request.getLedgerEndTerm()<memberState.getLedgerEndTerm())returnoplblur.opltdteeVoteResponse(request).term(memm.o}elseif(request.getLedgerEndTerm()==memberState.getLedgerEndTerm()&&request.getLedgerEndIndex()<memberState.getLedgerEndIndex()){returnoplblur.opltdteeVoteResponse(request).term(memif(request.getTerm()<memberState.getLedgerEndTerm())returnoplblur.opltdteeVoteResponse(request).term(memte3lgerndermlderEdTrm如果請(qǐng)求節(jié)點(diǎn)的lgernTerm小于當(dāng)前節(jié)點(diǎn)的lgernderm如果lgernderm相等,但是ledgerEndIndex比當(dāng)前節(jié)點(diǎn)小,則拒絕,原因如果請(qǐng)求的term小于lgernTerm以同樣的理由拒絕。returnreturnoletblFtuecmleeFtur(nVoteResponse(request).term(membeStep4Step1ompeabluue<Hatetsposefuture=dLedgerRpcService.heartBeat(heafuue.heomeeeaBeaRepnex,Throwableex)->{try{if(ex!=null){throwex;switch(DLedgerResponseCode.valueOf(x.getCode())){caseSUCCESS:casecasenconLeadecopaendefae,true);caseTERM_NOT_READY:if||memeae.uoumucNumge)+notReadyNum.get())){}catch(Throwablet)logger.error("Parseheartbeatresponsefailed",}finallyifaNum.e)==memeae.eee)){Step2主節(jié)點(diǎn)的投票termbbeaLah.aaheaBemeIneva,meUn.ILIOND);ifmembae.uoumuccum.e))){ //@1auccHeaBeame=}}("[{}]Parseheartbeatresponsesincost={}term={}lu={={}notReadyNum={}inconsistLea

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論