四vip課程分布式框架專題32api源碼分析rocketmq原理解析_第1頁
四vip課程分布式框架專題32api源碼分析rocketmq原理解析_第2頁
四vip課程分布式框架專題32api源碼分析rocketmq原理解析_第3頁
四vip課程分布式框架專題32api源碼分析rocketmq原理解析_第4頁
四vip課程分布式框架專題32api源碼分析rocketmq原理解析_第5頁
已閱讀5頁,還剩51頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

前 第一章: 第二章 三:長輪 六:pull消息消 七 第三章: 三 四:HA&master HA異步 :Broker 第四章: 一:Namesrv功 二:Namesrv啟動流程 三 一: Server 公用抽象 invokeSyncImpl同步調(diào)用實 invokeAsyncImpl異步調(diào)用實 invokeOnewayImpl單向請 mand接收請求處 mand接收響應(yīng)處 二:NettyRemotingServerRemoting服務(wù)端實 前此文檔是從學(xué)習(xí)mq源碼過程中的筆記中整理出來的,由于時間及能力原因,理 mq有所幫助。mq是阿里基于開源思一款產(chǎn)品,代碼托管于上,要想學(xué)好用mq請從 MQ獲取最的文檔、問題解答、原理介第一章一:Producer啟動流Producerproducer信息(procduergroup)定時發(fā)送到,brokerAddrTable集合中列出的broker上去Producer發(fā)送消息只發(fā)送到master的broker機器,在通過broker的主從機制拷貝二 如何發(fā)送消Broker1的隊列為queue0,queue1Broker3當(dāng)然一般情況下的brokerbroker1_queue0,broker1_queue1,> queue都會通過sendWhichQueuequeue的輪詢lastBrokerNamequeue發(fā)送失敗,這次選擇應(yīng)該避開同一個queue發(fā)送失敗后,重試幾次retryTimesWhenSendFailed=-- -- //代表發(fā)送消息到達(dá)的 向指定broker的指定topic的指定queue發(fā)送消息發(fā)送失敗(1)重試次數(shù)不到兩次(2)3000(毫秒),換 發(fā)送普通消順序消息發(fā)mq能夠保證消息嚴(yán)格順序,但是mqproducer保證順序消息按順序發(fā)送到同一個queue中,比如流程(1)下單(2)支付(3)支付成功,這三個消息需要根據(jù)特定規(guī)則將這個三個消息按順序發(fā)送到一個queue一般消息是通過輪詢所有隊列發(fā)送的,順序消息可以根據(jù)業(yè)務(wù)比如說訂單號,messageQueueList分布式事物消一階段,向broker發(fā)送一條prepared的消息,返回消息的offsetLocalTransactionExecuter二階段,處理完本地事物務(wù)得到事物狀態(tài),根據(jù)offset查找到commitLog中的prepared消息,設(shè)置消息狀態(tài)commitType或者rollbackType,讓后將信息添加到commitLog中,其實二階段生成了兩條消息三:Broker落地消普通消息落Broker根據(jù)producer請求的 ode.SEND_MESSAGE選擇對應(yīng)的處理分布式事物消息落commitLog20位開始的八位記錄是的消息在邏輯隊列中的queueoffset,但是針對事物消息為preparedType和rollbackType的的是事物狀態(tài)ConsumeQueuepreparedType和rollbackType的消息不會將請求分發(fā)到ConsumeQueue中去,即不處理,所以不會被消息transactionstabletableprepared消息記,通過TransactionStateService服務(wù)將消息加到事務(wù)狀態(tài)的表格tranStateTable的文件中;如果是commitType和rollbackType消息,修改事物狀態(tài)表格tranStateTable中的消息狀態(tài)。記錄TransactionRedoLog日志:記錄了commitLogOffset,msgSize,preapredTransactionOffset,storeTimestamp。事物狀態(tài)表是有MapedFileQueue將多個文件組成續(xù)的隊列,它的單元是定長為24個字節(jié)的數(shù)據(jù),tranStateTableOffset可以認(rèn)為是事物狀態(tài)消息的個數(shù),索引偏移量,它的值是定時回查線程會定時掃描(默認(rèn)每分鐘)每個事務(wù)狀態(tài)的表格文件,遍歷事prepared狀態(tài)的如果消息小于事務(wù)回查至少間隔時間(默認(rèn)是一分鐘)跳出終根據(jù)group隨機選擇一臺producer建一個Runnable任務(wù)異步執(zhí)行producer的回調(diào)接口,處理回調(diào),在調(diào)endTransactionOneway向broker發(fā)送請求更新事物消息的最終狀態(tài)事物消息的()tranRedoLog文件的消息DispatchRequest,更新TransactionState記錄TransactionRedo刪除事物狀態(tài)表重建 將上面過濾出的prepared消息,添加到事物狀態(tài)表文件redologcommitlog來做的第二拉模式拉取消息,consumer做負(fù)載均衡并通過長輪詢向broker拉消息。 mq的一 啟動流消費端Start向mq Factory本消費者啟動端通信定時清理下線的borker定時持久化Consumer消費進(jìn)度(廣播到本地,集群到Broker)啟動拉消息服務(wù)namesrv更新topic路由信息喚醒Rebalance服務(wù)線程二:消費端負(fù)載均消費端會通過RebalanceService線程,10topic下的所有隊列負(fù)載消費端遍歷自己的所有topic,依次調(diào)rebalanceByTopic選擇一臺broker獲取基于group的所有消費端(有心跳向所有broker客戶端信息)選擇隊列分配策略實例AllocateMessageQueueStrategy執(zhí)行分配算法獲取隊列集合Set<MessageQueue>mqSet將所有queue排好序類似于記錄3)按照機房來配置隊列獲取指定機房的queue,置ProcessQueue的droped屬性為true構(gòu)建這個隊列的ProcessQueue注:ProcessQueue正在被消費的隊列,長輪詢拉取到消息都會先到ProcessQueue的TreeMap<Long,的processqueue才能被執(zhí)行消費rollback:將消費在msgTreeMapTemp中的消息,放回msgTreeMap重新消費mq的消息是由consumer端主動到broker拉取的consumerbroker發(fā)送拉消息PullMessageServiceLinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息2000,從內(nèi)存中獲取 //TODO這個值跟pullRequest.getNextOffset區(qū)構(gòu)建 pull接口用到的消息中放入隊列最大最小offset,方便應(yīng)用來感知消息堆積度pullInterval0(拉消息間隔,如果為了降低拉取速度,0的值pullInterval0立刻在執(zhí)行拉四 消息—并發(fā)消費消通過長輪詢拉取到消息后會提交到消息服務(wù)ConsumeMessageConcurrentlyService,ConsumeMessageConcurrentlyServic的submitConsumeRequest方法構(gòu)建ConsumeRequest消費端consumer構(gòu)建一個消費消息任務(wù)ConsumeRequest消費一批消息的個數(shù)是可配置的consumeMessageBat axSize=1,默認(rèn)批量個數(shù)為一個ConsumeRequest任務(wù)run給消息設(shè)置消費失敗時候的retrytopic,當(dāng)消息發(fā)送失敗的時候發(fā)送到

ackIndex來標(biāo)記成ackIndex設(shè)置為-秒鐘以后重試重新消費消息,在走一次上面的消費流程。offsetTable的更新,后面有定時任務(wù)定時更新到broker上去五:push消費-順序消費消構(gòu)建一個線程池來接收消費請求ConsumeRequest構(gòu)建一個單線程的本地線程,ConsumeRequest,用來執(zhí)行,Map<brokernameSet<MessageQueue>>brokernamebrokermaster機器地址brokerNameSet<MessageQueue>發(fā)送broker請求鎖定這些隊列。brokerbrokerqueuequeue被某個BrokerMessageQueue,設(shè)置對應(yīng)的正在處理隊列ProccessQueuelocked屬性為true沒有鎖定設(shè)置為false通過長輪詢拉取到消息后會提交到消息服務(wù)ConsumeMessageOrderlyService,交到線程池。ConsumeRequest是由ProcessQueue和Messagequeue組成。messagequeueconsumer內(nèi),同一個隊列串行truelock放到本地線程稍后鎖定在消費。如果locktrue且沒有過期,開始消費消息計算任務(wù)執(zhí)行的時間如果大于一分鐘且線程數(shù)小于隊列數(shù)情況下,將processqueue,proccessqueue獲取批次消息,processqueue.takeMessags(batchSize從msgTreeMap mit方新到broker上去六:pull消息消列七 利用DefaultMQPushConsumerImpl 獲取ProcessQueueTable<MessageQueue,ProcessQueue>的keyset的messagequeue去獲取offsetTable中的messagequeue的值,在update的時候如果沒有對應(yīng)的Messagequeue會構(gòu)建,但是也會rebalance的時候?qū)]有分配到的messagequeue刪除offsetbrokerUnregiser客戶端 第三章一:brker的啟BrokernamesrvTopic在broker文件上的json格式 }Namesrv接收Broker的topic信息,namesrv只存內(nèi)存,但是broker有任務(wù)定時推 Mq通過ConfigMananger來管理配置加載以及持久{"%RETRY%group_name":{ //100讀權(quán)限,10寫權(quán) 6是110讀寫權(quán) }}}加載消費進(jìn)度偏移量{"%RETRYgroup_name@group_name":{0:0重試隊列消費group_name":{//隊列 //隊列 }}}{" "groupName":"group_name",}}}二:消mq的消息的是由consumequeue和commitLog配合完成consumequeue消息的邏輯隊列,相當(dāng)于字典的 文件commitLog上的位置,文件地址:${user.home}\store\consumequeue\${ consumequeue中單元是一個20字節(jié)定長的數(shù)據(jù),是順序?qū)戫樞蜃xcommitLogOffset是指這條消息在commitLog成一個ConsumeQueue,TopicAQueue=1組成一個另一個ConsumeQueue. //消息體BODY 這個標(biāo)志 //QUEUEOFFSETconsumequeue的偏移量,可以代表這個隊列中消息的個數(shù),要通過這個值查找到consumequeue中數(shù)據(jù),QUEUEOFFSET*20才是偏移地址+8+4+8+8 +8+8+8//RECONSUMETIMES消息被某個訂閱組重新消費了幾次(立計數(shù)),topic名字為%retry%groupNamequeueId=0的隊列 //4個字節(jié)存放消息體大小值,后bodylength大小 //topictopic + //2個字節(jié)(short)存放屬性值大小,后存放MapedFilePageCache文件封裝,操作物理文件在內(nèi)存中的映射以及將內(nèi)存數(shù)據(jù)持久化到物理文件中,代碼中寫死了要求os系統(tǒng)的頁大小為4k,消息刷盤根據(jù)參數(shù)FileChannelfileChannel=newRandomAccessFile(file,fileSize:filename:filename是消息在此文件的中初始偏移量,排好序后組成了續(xù)的消息隊getLastMapedFile獲取,此函數(shù)如果集合中一個也沒有創(chuàng)建一個,如果最后一個寫滿了也創(chuàng)一個MapedFile文件地址,通過預(yù)分配服務(wù)AllocateMapedFileService異步預(yù)創(chuàng)建下一個MapedFile1G的文mitWhereDefaultMessageStore消息層實<1>從mapedFileQueue獲取的映射文 <5>三LoadLoadcommit遍歷出${user.homestore\${commitlog}commitLog文件,按文件名(文件名就是文件的初始偏移量MapedFile對象,在MapedFileQueue中用集合list把這些MapedFile文件組成一個邏輯上連續(xù)的隊列Loadconsume遍歷${user.homestore\consumequeue下的所有文件夾(topic就是一個文件}\queueId,文件來構(gòu)建ConsueQueue對象DefaultMessageStore中結(jié)構(gòu)Map<topic,Map<queueId,ConsumequeueMapedFileQueuemapedFile組成一個邏輯上連續(xù)的隊加載${user.home}\store\checkpoint這個文件了3個long類型的值來記錄模型最終一致的時間點,這個3個long的值為physicMsgTimestamp為commitLog最后刷盤的時間indexMsgTimestamp為索引最終刷盤時間abort,linuxviabort文件存在,系統(tǒng)認(rèn)為是異?;譃槭裁凑f先正?;謴?fù)異常恢復(fù)在哪呢?broker是異常啟動時候異常恢復(fù)commitLog時會重新構(gòu)建請到DispatessageService服務(wù),來重新生成ConsumeQueue數(shù)據(jù),索引以及事物消息的redolog,確,現(xiàn)在文件寫到哪了(wrotePositionFlush到了什么位置(committedPosition)?ConsumeQueuemapedFiles集合中,從倒數(shù)第三個文件開始恢復(fù)(為什元是20字節(jié)的定長數(shù)據(jù),所以是依次分別取了Offsetlong類型了commitLog的數(shù)據(jù)偏移量 int類型了在commitLog上消息大小tagcodetag的哈希值目前 mq判斷的consumequeue數(shù)據(jù)是否有效的方式為判斷offset>=0&&size>0設(shè)置 所在文件 的 commitedPosition消息大小值msgSize大于 文件錯誤恢復(fù)結(jié)束等于0 當(dāng)${user.home}\store\abort文件存在,代表異?;謴?fù)${user.homestore\checkpoint獲取最終一致的時間點說明checkpoint在此mapedfile文件中從checkpoint所在mapedFile開始恢復(fù)數(shù)據(jù),它的整體過程跟正?;謴?fù)(1)消息后派送到分發(fā)消息服務(wù)Dispat 建ConsumeQueue以及索引 四:HA&masterbroker啟動的時候BrokerControllerslavemaster地址更新,沒有配置所有broker會想namesrv,從namesrv獲取haServerAddr,然后更新到HA當(dāng)HA 的Mas ddress不為空的時(因為brokermaster和slave都構(gòu)建了HA 會主動連接master獲取SocketChannelMasterSlave請求的端口,默認(rèn)為服務(wù)端口+1接收slave上傳的offsetlong類型intposthis.byteBufferRead.position()(this.byteBufferRead.position(8)//沒有理解意圖longreadOffset=this.byteBufferRead.getLong(pos-8); 同步topic的配置信息 Master通過AcceptSocketServiceslave的連接,每個masterslave連接都會構(gòu)建一個HAConnectionmasterslave部署結(jié)構(gòu)的會有多個HAConnection實例,MasterHAConnectionslaveWriteSocketService對象和Slave反饋服務(wù)線程對象ReadSocketServiceslaveRequestOffsetslave同步完數(shù)據(jù)都會向masterack表示下次同步的數(shù)據(jù)的offset。slaveslaveRequestOffset=0mastercommitLog文件開始同步(如果要把master上的所有commitLog文件同步到slave的話,把masterOffset值賦為minOffset)socket<PhyOffset><BodySize>Bodyslave通過HA 通知前端線程,如果是同步的話通知是否成功Slave通過 masterslavecommitlog的數(shù)據(jù),masterslave寫入數(shù)據(jù)的格式是Slave初始化DefaultMessageStore時候會構(gòu)建ReputMessageService服務(wù)線程并在啟動服務(wù)的start方法中被啟動HA同步,當(dāng)msg寫入master的commitlog文件后,判斷maser的角色如果是同步雙SYNC_MASTER,等待master同步到slave在返回結(jié)果HA異步五:刷盤策,六:索引服索引結(jié)索引文件由索引文件頭IndexHeader,Slot: 8位long類型,索引文件構(gòu)建第一個索引的消息落在broker的時間 8位long類型,索引文件構(gòu)建最后一個索引消息落broker時間 8位long類型,索引文件構(gòu)建第一個索引的消息commitLog偏移量 8位long類型,索引文件構(gòu)建最后一個索引消息commitLog偏移量 4位int類型,構(gòu)建索引占用的槽位數(shù)(這個值貌似沒有具體作用) 4位int類型,索引文件中構(gòu)建的索引個數(shù)計算消息的對應(yīng)的slotPos=Math.abs(keyHash)%hashSlotNum消息在IndexFile中的偏移量absSlotPos=IndexHeader.INDEX_HEADER_SIZE+slotPos*4位int值,的是key的hash8位long值的是消息在commitlog的物理偏移量4位int 4位int :索引服IndexService文件indexFile的路徑queryOffsettopickey時間跨度來查詢消息:構(gòu)建索引服分發(fā)消息索引服務(wù)將消息位置分發(fā)到ConsumeQueue中后,加入IndexService構(gòu)建索引key(topic+"#"+key)計算槽位在indexfile的具體偏移量位置:Broker (comsumer,producer)之間的心跳 ChannelInfo的時間戳,來表 , HouseKeeService線程定時清除不活動的連接 :Broker與namesrv之間的(1)broker的topic信構(gòu)建或者更新第四章接收broker的請求broker路由信息(包括master和slave) 的請求根據(jù)某個topic獲取所有到broker的路由信息二 啟動流程三它的多個slave id表示是master還是slaveid=0為 大于0為, BrokerLiveInfo代表一個活的broker由最后更新時間,一個channel,數(shù)據(jù)版本和Ha地址組成四:Namesrv與broker間的心也是topic信息到namesrvNamesrvControllerRouteInfoMangerscanNotActiveBroker方法來定時清理不活動的broker(默認(rèn)兩分鐘沒有向namesrv發(fā)送心跳更新BrokerLiveInfo時間戳的BrokerLiveInfo的時間戳,如果過期關(guān)閉channel連接第五Remoting通信層一: Server 公用抽象過opaque從緩存查找對應(yīng)的ResponseFuture對象invokeSyncImpl同步調(diào)用應(yīng)過來,就用不到緩存中的ResponseFuturel)invokeAsyncImpl異步調(diào)用限默認(rèn)2048個發(fā)送成功 dRequestOK(false),信號量通過once釋放, 單向請mand接收請求根據(jù)請求code查找對應(yīng)的處理器線程池pair,沒有用默認(rèn)的 若不是o

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論