詳解RocketMQ中的消費者啟動與消費流程分析_第1頁
詳解RocketMQ中的消費者啟動與消費流程分析_第2頁
詳解RocketMQ中的消費者啟動與消費流程分析_第3頁
詳解RocketMQ中的消費者啟動與消費流程分析_第4頁
詳解RocketMQ中的消費者啟動與消費流程分析_第5頁
已閱讀5頁,還剩21頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

第詳解RocketMQ中的消費者啟動與消費流程分析目錄一、簡介1.1RocketMQ簡介1.2工作流程二、消費者啟動流程2.1實例化消費者2.2設置NameServer和訂閱topic過程2.2.1添加tag2.2.2發(fā)送心跳至Broker2.2.3上傳過濾器類至FilterServer2.3注冊回調(diào)實現(xiàn)類2.4消費者啟動三、pull/push模式消費3.1pull模式-DefaultMQPullConsumer3.2push模式-DefaultMQPushConsumer3.3小結(jié)四、順序消息4.1實現(xiàn)MQ順序消息發(fā)送存在問題4.2實現(xiàn)MQ順序消息關鍵點五、消息ack機制5.1消息消費失敗處理5.2消息重投帶來問題六、總結(jié)

一、簡介

1.1RocketMQ簡介

RocketMQ是由阿里巴巴開源的分布式消息中間件,支持順序消息、定時消息、自定義過濾器、負載均衡、pull/push消息等功能。RocketMQ主要由Producer、Broker、Consumer、NameServer四部分組成,其中Producer負責生產(chǎn)消息,Consumer負責消費消息,Broker負責存儲消息。NameServer充當名字路由服務,整體架構(gòu)圖如下所示:

**Producer:**負責生產(chǎn)消息,一般由業(yè)務系統(tǒng)生產(chǎn)消息,可通過集群方式部署。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認信息,單向發(fā)送不需要。**Consumer:**負責消費消息,一般是后臺系統(tǒng)負責異步消費,可通過集群方式部署。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。提供pull/push兩者消費模式。**BrokerServer:**負責存儲消息、轉(zhuǎn)發(fā)消息。RocketMQ系統(tǒng)中負責接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準備,存儲消息相關的元數(shù)據(jù),包括消費者組、消費進度偏移和主題和隊列消息等。**NameServer:**名字服務,充當路由消息的提供者。生產(chǎn)者或消費者能夠通過名字服務查找各主題相應的BrokerIP列表。多個NameServer實例組成集群,相互獨立,沒有信息交換。

本文基于ApacheRocketMQ最新版本主要講述RocketMQ的消費者機制,分析其啟動流程、pull/push機制,消息ack機制以及定時消息和順序消息的不同。

1.2工作流程

(1)啟動NameServer。

NameServer起來后監(jiān)聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。

(2)啟動Broker。

跟所有的NameServer保持長連接,定時發(fā)送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。

(3)創(chuàng)建Topic。

創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發(fā)送消息時自動創(chuàng)建Topic。

(4)Producer發(fā)送消息。

啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當前發(fā)送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發(fā)消息。

(5)Consumer消費消息。

跟其中一臺NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。

二、消費者啟動流程

官方給出的消費者實現(xiàn)代碼如下所示:

publicclassConsumer{

publicstaticvoidmain(String[]args)throwsInterruptedException,MQClientException{

//實例化消費者

DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("TestConsumer");

//設置NameServer的地址

consumer.setNamesrvAddr("localhost:9876");

//訂閱一個Topic,以及Tag來過濾需要消費的消息

consumer.subscribe("Test","*");

//注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息

consumer.registerMessageListener(newMessageListenerConcurrently(){

@Override

publicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){

System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);

//標記該消息已經(jīng)被成功消費

returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;

//啟動消費者實例

consumer.start();

System.out.printf("ConsumerStarted.%n");

}

下面讓我們來分析消費者在啟動中每一階段中做了什么吧,letsgo.

2.1實例化消費者

第一步主要是實例化消費者,這里采取默認的Push消費者模式,構(gòu)造器中參數(shù)為對應的消費者分組,指定同一分組可以消費同一類型的消息,如果沒有指定,將會采取默認的分組模式,這里實例化了一個DefaultMQPushConsumerImpl對象,它是后面消費功能的主要實現(xiàn)類。

//實例化消費者

DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("TestConsumer");

主要通過DefaultMQPushConsumer實例化DefaultMQPushConsumerImpl,它是主要的消費功能實現(xiàn)類。

publicDefaultMQPushConsumer(finalStringconsumerGroup,RPCHookrpcHook,

AllocateMessageQueueStrategyallocateMessageQueueStrategy){

this.consumerGroup=consumerGroup;

this.allocateMessageQueueStrategy=allocateMessageQueueStrategy;

defaultMQPushConsumerImpl=newDefaultMQPushConsumerImpl(this,rpcHook);

}

2.2設置NameServer和訂閱topic過程

//設置NameServer的地址

consumer.setNamesrvAddr("localhost:9876");

//訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息

consumer.subscribe("Test","*");

2.2.1添加tag

設置NameServer地址后,這個地址為你名字服務集群的地址,類似于zookeeper集群地址,樣例給出的是單機本地地址,搭建集群后,可以設置為集群地址,接下來我們需要訂閱一個主題topic下的消息,設置對應的topic,可以進行分類,通過設置不同的tag來實現(xiàn),但目前只支持||進行連接,如:tag1||tag2||tag3。歸根在于構(gòu)造訂閱數(shù)據(jù)時,源碼通過||進行了字符串的分割,如下所示:

publicstaticSubscriptionDatabuildSubscriptionData(finalStringconsumerGroup,Stringtopic,

StringsubString)throwsException{

SubscriptionDatasubscriptionData=newSubscriptionData();

subscriptionData.setTopic(topic);

subscriptionData.setSubString(subString);

if(null==subString||subString.equals(SubscriptionData.SUB_ALL)||subString.length()==0){

subscriptionData.setSubString(SubscriptionData.SUB_ALL);

}else{

String[]tags=subString.split("\\|\\|");

if(tags.length0){

for(Stringtag:tags){

if(tag.length()0){

StringtrimString=tag.trim();

if(trimString.length()0){

subscriptionData.getTagsSet().add(trimString);

subscriptionData.getCodeSet().add(trimString.hashCode());

}else{

thrownewException("subStringspliterror");

returnsubscriptionData;

}

2.2.2發(fā)送心跳至Broker

前面構(gòu)造好訂閱主題和分類后,將其放入了一個ConcurrentMap中,并調(diào)用sendHeartbeatToAllBrokerWithLock()方法,進行心跳檢測和上傳過濾器類至broker集群(生產(chǎn)者啟動過程也會進行此步驟)。如下所示:

publicvoidsendHeartbeatToAllBrokerWithLock(){

if(this.lockHeartbeat.tryLock()){

try{

this.sendHeartbeatToAllBroker();

this.uploadFilterClassSource();

}catch(finalExceptione){

log.error("sendHeartbeatToAllBrokerexception",e);

}finally{

this.lockHeartbeat.unlock();

}else{

log.warn("lockheartBeat,butfailed.");

}

首先會對broker集群進行心跳檢測,在此過程中會施加鎖,它會執(zhí)行sendHeartbeatToAllBroker方法,構(gòu)建心跳數(shù)據(jù)heartbeatData,然后遍歷消費和生產(chǎn)者table,將消費者和生產(chǎn)者信息加入到heartbeatData中,當都存在消費者和生產(chǎn)者的情況下,會遍歷brokerAddrTable,往每個broker地址發(fā)送心跳,相當于往對應地址發(fā)送一次http請求,用于探測當前broker是否存活。

this.mQClientAPIImpl.sendHearbeat(addr,heartbeatData,3000);

2.2.3上傳過濾器類至FilterServer

之后會執(zhí)行uploadFilterClassSource()方法,只有push模式才會有此過程,在此模式下,它會循環(huán)遍歷訂閱數(shù)據(jù)SubscriptionData,如果此訂閱數(shù)據(jù)使用了類模式過濾,會調(diào)uploadFilterClassToAllFilterServer()方法:上傳用戶自定義的過濾消息實現(xiàn)類至過濾器服務器。

privatevoiduploadFilterClassSource(){

IteratorEntryString,MQConsumerInnerit=this.consumerTable.entrySet().iterator();

while(it.hasNext()){

EntryString,MQConsumerInnernext=it.next();

MQConsumerInnerconsumer=next.getValue();

if(ConsumeType.CONSUME_PASSIVELY==consumer.consumeType()){

SetSubscriptionDatasubscriptions=consumer.subscriptions();

for(SubscriptionDatasub:subscriptions){

if(sub.isClassFilterMode()sub.getFilterClassSource()!=null){

finalStringconsumerGroup=consumer.groupName();

finalStringclassName=sub.getSubString();

finalStringtopic=sub.getTopic();

finalStringfilterClassSource=sub.getFilterClassSource();

try{

this.uploadFilterClassToAllFilterServer(consumerGroup,className,topic,filterClassSource);

}catch(Exceptione){

log.error("uploadFilterClassToAllFilterServerException",e);

過濾器類的作用:消費端可以上傳一個Class類文件到FilterServer,Consumer從FilterServer拉取消息時,F(xiàn)ilterServer會把請求轉(zhuǎn)發(fā)給Broker,F(xiàn)ilterServer收取到Broker消息后,根據(jù)上傳的過濾類中的邏輯做過濾操作,過濾完成后再把消息給到Consumer,用戶可以自定義過濾消息的實現(xiàn)類。

2.3注冊回調(diào)實現(xiàn)類

接下來就是代碼中的注冊回調(diào)實現(xiàn)類了,當然,如果你是pull模式的話就不需要實現(xiàn)它了,push模式需要定義,兩者區(qū)別后面會講到,它主要用于從broker實時獲取消息,這里有兩種消費上下文類型,用于不同的消費類型。

**ConsumeConcurrentlyContext:**延時類消息上下文,用于延時消息,即定時消息,默認不延遲,可以設置延遲等級,每個等級對應固定時間刻度,RocketMQ中不能自定義延遲時間,延遲等級從1開始,對應的時間間隔如下所示:

1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

**ConsumeOrderlyContext:**順序類消息上下文,控制發(fā)送消息的順序,生產(chǎn)者設置分片路由規(guī)則后,相同key只落到指定queue上,消費過程中會對順序消息所在的queue加鎖,保證消息的有序性。

2.4消費者啟動

我們先來看下消費者啟動的過程,如下所示:

**(1)this.checkConfig():**首先是檢測消費配置項,包括消費分組group、消息模型(集群、廣播)、訂閱數(shù)據(jù)、消息監(jiān)聽器等是否存在,如果不存在的話,會拋出異常。

**(2)copySubscription():**構(gòu)建主題訂閱信息SubscriptionData并加入到RebalanceImpl負載均衡方法的訂閱信息中。

**(3)getAndCreateMQClientInstance():**初始化MQ客戶端實例。

**(4)offsetStore.load():**根據(jù)不同消息模式創(chuàng)建消費進度offsetStore并加載:BROADCASTING-廣播模式,同一個消費group中的consumer都消費一次,CLUSTERING-集群模式,默認方式,只被消費一次。

switch(this.defaultMQPushConsumer.getMessageModel()){

caseBROADCASTING:

this.offsetStore=newLocalFileOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());

break;

caseCLUSTERING:

this.offsetStore=newRemoteBrokerOffsetStore(this.mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup());

break;

default:

break;

可以通過setMessageModel方式設置不同模式;廣播模式下同消費組的消費者相互獨立,消費進度在本地單獨進行存儲;集群模式下,同一條消息只會被同一個消費組消費一次,消費進度會參與到負載均衡中,消費進度是共享在整個消費組中的。

**(5)consumeMessageService.start():**根據(jù)不同消息監(jiān)聽類型實例化并啟動。這里有延時消息和順序消息。

這里主要講下順序消息,RocketMQ也幫我們實現(xiàn)了,在啟動時,如果是集群模式并是順序類型,它會啟動定時任務,定時向broker發(fā)送批量鎖,鎖住當前順序消費發(fā)往的消息隊列,順序消息因為生產(chǎn)者生產(chǎn)消息時指定了分片策略和消息上下文,只會發(fā)往一個消費隊列。

定時任務發(fā)送批量鎖,鎖住當前順序消息隊列。

publicvoidstart(){

if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())){

this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){

@Override

publicvoidrun(){

ConsumeMessageOrderlyService.this.lockMQPeriodically();

},1000*1,ProcessQueue.REBALANCE_LOCK_INTERVAL,TimeUnit.MILLISECONDS);

發(fā)送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合lockOKMQSet,順序消息具體實現(xiàn)可查看后面第四節(jié)。

**(6)mQClientFactory.registerConsumer():**MQClientInstance注冊消費者,并啟動MQClientInstance,沒有注冊成功會結(jié)束消費服務。

**(7)mQClientFactory.start():**最后會啟動如下服務:遠程客戶端、定時任務、pull消息服務、負載均衡服務、push消息服務,然后將狀態(tài)改為運行中。

switch(this.serviceState){

caseCREATE_JUST:

this.serviceState=ServiceState.START_FAILED;

//Ifnotspecified,lookingaddressfromnameserver

if(null==this.clientConfig.getNamesrvAddr()){

this.mQClientAPIImpl.fetchNameServerAddr();

//Startrequest-responsechannel

this.mQClientAPIImpl.start();

//Startvariousscheduletasks

this.startScheduledTask();

//Startpullservice

this.pullMessageService.start();

//Startrebalanceservice

this.rebalanceService.start();

//Startpushservice

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

("theclientfactory[{}]startOK",this.clientId);

this.serviceState=ServiceState.RUNNING;

break;

caseRUNNING:

break;

caseSHUTDOWN_ALREADY:

break;

caseSTART_FAILED:

thrownewMQClientException("TheFactoryobject["+this.getClientId()+"]hasbeencreatedbefore,andfailed.",null);

default:

break;

全部啟動完畢后,整個消費者也就啟動好了,接下來就可以對生產(chǎn)者發(fā)送過來的消息進行消費了,那么是如何進行消息消費的呢?不同的消息模式有何區(qū)別呢?

三、pull/push模式消費

3.1pull模式-DefaultMQPullConsumer

**pull拉取式消費:**應用通常主動調(diào)用Consumer的拉消息方法從Broker服務器拉消息、主動權(quán)由應用程序控制,可以指定消費的位移,【偽代碼】如下所示:

DefaultMQPullConsumerconsumer=newDefaultMQPullConsumer("TestConsumer");

//設置NameServer的地址

consumer.setNamesrvAddr("localhost:9876");

//啟動消費者實例

consumer.start();

//獲取主題下所有的消息隊列,這里根據(jù)主題從nameserver獲取的

SetMessageQueuemqs=consumer.fetchSubscribeMessageQueues("Test");

for(MessageQueuequeue:mqs){

//獲取當前隊列的消費位移,指定消費進度offset,fromstore:從broker中獲取還是本地獲取,true-broker

longoffset=consumer.fetchConsumeOffset(queue,true);

PullResultpullResult=null;

while(offsetpullResult.getMaxOffset()){

//第二個參數(shù)為tag,獲取指定topic下的tag

//第三個參數(shù)表示從哪個位移下開始消費消息

//第四個參數(shù)表示一次最大拉取多少個消息

try{

pullResult=consumer.pullBlockIfNotFound(queue,"*",offset,32);

}catch(Exceptione){

e.printStackTrace();

System.out.println("pull拉取消息失敗");

//代碼省略,記錄消息位移

offset=pullResult.getNextBeginOffset();

//代碼省略,這里為消費消息

可以看到我們是主動拉取topic對應下的消息隊列,然后遍歷它們,獲取當前消費進度并進行消費。

3.2push模式-DefaultMQPushConsumer

該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高,現(xiàn)在一般推薦使用該方式,具體示例可以觀看第一章開頭的官方demo。

它也是通過實現(xiàn)pull方式來實現(xiàn)的,首先,前面2.4消費者啟動之后,最后會啟動拉取消息服務pullMessageService和負載均衡rebalanceService服務,它們啟動后會一直有線程進行消費。

caseCREATE_JUST:

//......

//Startpullservice

this.pullMessageService.start();

//Startrebalanceservice

this.rebalanceService.start();

//.......

this.serviceState=ServiceState.RUNNING;

break;

caseRUNNING:

這里面調(diào)用doRebalance()方法,進行負載均衡,默認每20s做一次,會輪詢所有訂閱該實例的topic。

publicclassRebalanceServiceextendsServiceThread{

//初始化,省略....

@Override

publicvoidrun(){

(this.getServiceName()+"servicestarted");

while(!this.isStopped()){

this.waitForRunning(waitInterval);

//做負載均衡

this.mqClientFactory.doRebalance();

(this.getServiceName()+"serviceend");

@Override

publicStringgetServiceName(){

returnRebalanceService.class.getSimpleName();

然后根據(jù)每個topic,以及它是否順序消息模式來做rebalance。

具體做法就是先對Topic下的消息消費隊列、消費者Id進行排序,然后用消息隊列的平均分配算法,計算出待拉取的消息隊列,將分配到的消息隊列集合與processQueueTable做一個過濾比對,新隊列不包含或已過期,則進行移除。

publicvoiddoRebalance(finalbooleanisOrder){

MapString,SubscriptionDatasubTable=this.getSubscriptionInner();

if(subTable!=null){

for(finalMap.EntryString,SubscriptionDataentry:subTable.entrySet()){

finalStringtopic=entry.getKey();

try{

/根據(jù)/每個topic,以及它是否順序消息模式來做rebalance

this.rebalanceByTopic(topic,isOrder);

}catch(Throwablee){

if(!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){

log.warn("rebalanceByTopicException",e);

this.truncateMessageQueueNotMyTopic();

rebalanceByTopic中廣播和集群模式都會執(zhí)行updateProcessQueueTableInRebalance()方法,最后會分發(fā)請求dispatchPullRequest,通過executePullRequestImmediately()方法將pull請求放入pull請求隊列pullRequestQueue中,注意,pull模式下分發(fā)請求方法dispatchPullRequest()實際實現(xiàn)是一個空方法,這里兩者很大不同,push模式實現(xiàn)如下:

@Override

publicvoiddispatchPullRequest(ListPullRequestpullRequestList){

for(PullRequestpullRequest:pullRequestList){

this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);

("doRebalance,{},addanewpullrequest{}",consumerGroup,pullRequest);

然后再PullMessageService中,因為前面consumer啟動成功了,PullMessageService線程會實時去取pullRequestQueue中的pull請求。

@Override

publicvoidrun(){

(this.getServiceName()+"servicestarted");

while(!this.isStopped()){

try{

PullRequestpullRequest=this.pullRequestQueue.take();

if(pullRequest!=null){

this.pullMessage(pullRequest);

}catch(InterruptedExceptione){

}catch(Exceptione){

log.error("PullMessageServiceRunMethodexception",e);

(this.getServiceName()+"serviceend");

取出來的pull請求又會經(jīng)由DefaultMQPushConsumerImpl的消息監(jiān)聽類,調(diào)用pullMessage()方法。

privatevoidpullMessage(finalPullRequestpullRequest){

finalMQConsumerInnerconsumer=this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

if(consumer!=null){

DefaultMQPushConsumerImplimpl=(DefaultMQPushConsumerImpl)consumer;

impl.pullMessage(pullRequest);

}else{

log.warn("NomatchedconsumerforthePullRequest{},dropit",pullRequest);

pullMessage()中pullKernelImpl()有一個Pullback方法用于執(zhí)行消息的回調(diào),它會通過submitConsumeRequest()這個方法來處理消息,總而言之就是通過線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到。

//Pull回調(diào)

PullCallbackpullCallback=newPullCallback(){

@Override

publicvoidonSuccess(PullResultpullResult){

if(pullResult!=null){

pullResult=DefaultMQPushConsumerImpl.this.pullAPIWcessPullResult(pullRequest.getMessageQueue(),pullResult,

subscriptionData);

switch(pullResult.getPullStatus()){

caseFOUND:

//省略...消費位移更新

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(

pullResult.getMsgFoundList(),

processQueue,

pullRequest.getMessageQueue(),

dispathToConsume);

這個方法對應的不同消費模式有著不同實現(xiàn),但都是會構(gòu)建一個消費請求ConsumeRequest,里面有一個run()方法,構(gòu)建完畢后,會把它放入到listener監(jiān)聽器中。

//監(jiān)聽消息

status=listener.consumeMessage(Collections.unmodifiableList(msgs),context);

還記得前面我們樣例給出的注冊監(jiān)聽器回調(diào)處理方法嗎?

我們可以點擊上面的consumeMessage方法,查看它在源碼中的實現(xiàn)位置,發(fā)現(xiàn)它就回到了我們前面的2.3注冊回調(diào)實現(xiàn)類里面了,整個流程是不是通順了呢?這個監(jiān)聽器中就會收到push的消息,拉取出來進行業(yè)務消費邏輯,下面是我們自己定義的消息回調(diào)處理方法。

//注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息

consumer.registerMessageListener(newMessageListenerConcurrently(){

@Override

publicConsumeConcurrentlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeConcurrentlyContextcontext){

System.out.printf("%sReceiveNewMessages:%s%n",Thread.currentThread().getName(),msgs);

//標記該消息已經(jīng)被成功消費

returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;

3.3小結(jié)

push模式相比較于pull模式不同的是,做負載均衡時,pullRequest請求會放入pullRequestQueue,然后PullMessageService線程會實時去取出這個請求,將消息存入ProcessQueue,通過線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到,這樣消息從分發(fā)請求到接收都是實時的,而pull模式是消費端主動去拉取指定消息的,需要指定消費進度。

對于我們開發(fā)者來說,選取哪種模式實現(xiàn)我們的業(yè)務邏輯比較合適呢?別急,先讓我們總結(jié)下他們的特點:

共同點:

兩者底層實際一樣,push模式也是基于pull模式來實現(xiàn)的。

pull模式需要我們通過程序主動通過consumer向broker拉消息,而消息的push模式則只需要我們提供一個listener監(jiān)聽,實時獲取消息。

優(yōu)點:

push模式采用長輪詢阻塞的方式獲取消息,實時性非常高;

push模式rocketMQ處理了獲取消息的細節(jié),使用起來比較簡單方便;

pull模式可以指定消費進度,想消費多少就消費多少,靈活性大。

缺點:

push模式當消費者能力遠遠低于生產(chǎn)者能力的時候,會產(chǎn)生一定的消費者消息堆積;

pull模式實時性很低,頻率不好設置;

拉取消息的間隔不好設置,太短則產(chǎn)生很多無效Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡性能,太長則實時性差。

適用場景:

對于服務端生產(chǎn)消息數(shù)據(jù)比較大時,而消費端處理比較復雜,消費能力相對較低時,這種情況就適用pull模式;

對于數(shù)據(jù)實時性要求高的場景,就比較適用與push模式。

現(xiàn)在的你是否明確業(yè)務中該使用哪種模式了呢?

四、順序消息

4.1實現(xiàn)MQ順序消息發(fā)送存在問題

(1)一般消息發(fā)送會采取輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊列);而消費消息的時候從多個queue上拉取消息,broker之間是無感知的,這種情況發(fā)送和消費是不能保證順序。

(2)異步方式發(fā)送消息時,發(fā)送的時候不是按著一條一條順序發(fā)送的,保證不了消息到達Broker的時間也是按照發(fā)送的順序來的。

消息發(fā)送到存儲,最后到消費要經(jīng)歷這么多步驟,我們該如何在業(yè)務中使用順序消息呢?讓咱們來一步步拆解下吧。

4.2實現(xiàn)MQ順序消息關鍵點

既然分散到多個broker上無法追蹤順序,那么可以控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。在發(fā)送時設置分片路由規(guī)則,讓相同key的消息只落到指定queue上,然后消費過程中對順序消息所在的queue加鎖,保證消息的有序性,讓這個queue上的消息就按照FIFO順序來進行消費。因此我們滿足以下三個條件是否就可以呢?

**1)消息順序發(fā)送:**多線程發(fā)送的消息無法保證有序性,因此,需要業(yè)務方在發(fā)送時,針對同一個業(yè)務編號(如同一筆訂單)的消息需要保證在一個線程內(nèi)順序發(fā)送,在上一個消息發(fā)送成功后,在進行下一個消息的發(fā)送。對應到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無法保證順序性。

//采用的同步發(fā)送方式,在一個線程內(nèi)順序發(fā)送,異步發(fā)送方式為:producer.send(msg,newSendCallback(){...})

SendResultsendResult=producer.send(msg,newMessageQueueSelector(){//…}

**2)消息順序存儲:**MQ的topic下會存在多個queue,要保證消息的順序存儲,同一個業(yè)務編號的消息需要被發(fā)送到一個queue中。對應到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue。即可以對業(yè)務編號設置路由規(guī)則,像根據(jù)隊列數(shù)量對業(yè)務字段hash取余,將消息發(fā)送到一個queue中。

//使用"%"操作,使得訂單id取余后相同的數(shù)據(jù)路由到同一個queue中,也可以自定義路由規(guī)則

longindex=id%mqs.size();

returnmqs.get((int)index);

3)消息順序消費:要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內(nèi)部,也只能有一個消費線程來消費該隊列。這里RocketMQ已經(jīng)為我們實現(xiàn)好了。

ListPullRequestpullRequestList=newArrayListPullRequest

for(MessageQueuemq:mqSet){

if(!cessQueueTable.containsKey(mq)){

if(isOrder!this.lock(mq)){

log.warn("doRebalance,{},addanewmqfailed,{},becauselockfailed",consumerGroup,mq);

continue;

//....省略

消費者重新負載,并且分配完消費隊列后,需要向mq服務器發(fā)起消息拉取請求,代碼實現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance()中,針對順序消息的消息拉取,mq做了以上判斷,即消費客戶端先向broker端發(fā)起對messageQueue的加鎖請求,只有加鎖成功時才創(chuàng)建pullRequest進行消息拉取,這里的pullRequest就是前面pull和push模式消息體,而updateProcessQueueTableInRebalance這個方法也是在前面消費者啟動過程中有講到過哦。

具體加鎖邏輯如下:

publicbooleanlock(finalMessageQueuemq){

FindBrokerResultfindBrokerResult=this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),MixAll.MASTER_ID,true);

if(findBrokerResult!=null){

LockBatchRequestBodyrequestBody=newLockBatchRequestBody();

requestBody.se

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論