2023在Spring生態(tài)中玩轉(zhuǎn)RockMQ_第1頁
2023在Spring生態(tài)中玩轉(zhuǎn)RockMQ_第2頁
2023在Spring生態(tài)中玩轉(zhuǎn)RockMQ_第3頁
2023在Spring生態(tài)中玩轉(zhuǎn)RockMQ_第4頁
2023在Spring生態(tài)中玩轉(zhuǎn)RockMQ_第5頁
已閱讀5頁,還剩81頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

在spring生態(tài)中玩轉(zhuǎn)TOC\o"1-1"\h\z\u開篇:在Spring生態(tài)中玩轉(zhuǎn) RocketMQSpring最初的故事:羅美琪和春波特的故事 RocketMQ-Spring畢業(yè)兩周年,為什么能成為Spring生態(tài)中最受歡迎的messaging實(shí) 方法一:使用rocketmq-spring-boot-starter來配置、發(fā)送和消費(fèi)RocketMQ消 方法二:SpringCloudStream體系及原理介紹:spring-cloud-stream-binder- 方法三:SpringCloudBus消息總線介 開篇:在Spring開篇:在Spring生態(tài)中玩轉(zhuǎn) 開篇:在Spring開篇:在Spring生態(tài)中玩轉(zhuǎn)ApacheRocketMQ11RocketMQSpringMessaing在Spring生態(tài)中使用RocketMQ到底有多少種方式?他們各自適用于什么場景?各spring6ApacheRocketMQrocketmq-spring畢業(yè)后的兩年,是如何成為Spring生態(tài)中最受歡迎的messaging實(shí)現(xiàn)的?最后將通過圖文和實(shí)操地方式帶來給位開發(fā)者玩轉(zhuǎn)在Spring生態(tài)中使用RocketMQ一、RocketMQ與Spring架,SpringMessagingSpringCloudStreamSpringBoot二、SpringSpringMessagingSpringFramework4SpringJmsTemplate的簡單的使用JMS步接收消息的一整套完整的基礎(chǔ)架構(gòu),SpringAMQPSpringBoot準(zhǔn),對消息發(fā)送端和消息接收端的模式進(jìn)行規(guī)定,比如消息Messaging對應(yīng)的模型就包括一個(gè)消息體Payload和消息頭HeaderBeanSpringBoot動(dòng)的POJO)SpringBootApacheRocketMQ生態(tài)中,RocketMQ-Spring-Boot-Starter(下文簡稱RocketMQ-Spring)就是一個(gè)支持SpringMessagingAPI標(biāo)準(zhǔn)的項(xiàng)目。該項(xiàng)目把RocketMQ的客戶端使用SpringBoot的方式進(jìn)行了封裝,可以讓用戶通過簡單的annotationSpringMessagingAPI持?jǐn)U展出RocketMQAPIRocketMQ-Spring期,RocketMQSpringRocketMQ-Spring引出一段羅美琪(RocketMQ)和春波特(SpringBoot)故事的佳話[1Spring布道師JoshLong向國外同學(xué)介紹如何使用RocketMQSpring收發(fā)消息[2]。RocketMQ-SpringSpring-KafkaSpring-AMQP(注:兩者均由Spring)SpringMessaging三、SpringCloudSpringCloudStreamSpringIntegrationSpringCloudStream@Input)和輸編程,而不需要關(guān)心運(yùn)行時(shí)具體的Binder在運(yùn)行時(shí),SpringCloudStream能夠自動(dòng)探測并使用在classpath下找到的構(gòu)建時(shí)包含進(jìn)不同的Binder。在更加復(fù)雜的使用場景中,也可以在應(yīng)用中打包多個(gè)BinderBinder,甚至在運(yùn)行時(shí)為不同的通道使用不同的Binder。Binder抽象使得SpringCloudStream應(yīng)用可以靈活的連接到中間件,加之SpringCloudStreamSpringBootSpringBoot(包括應(yīng)用啟動(dòng)參數(shù)、環(huán)境destination(例如,RocketMQtopicRabbitMQexchange)。SpringCloudStream屏蔽了底層消息中間件的實(shí)現(xiàn)細(xì)節(jié),希望以統(tǒng)一的一套APIBinderSpringRabbitbinderKafkaBinder。SpringCloudAlibabaRocketMQBinder[3],其主要實(shí)現(xiàn)原理是把發(fā)送消息最終代理給了RocketMQ-Spring的RocketMQTemplate,在消費(fèi)端則內(nèi)部會(huì)啟動(dòng)RocketMQ-SpringConsumerContainer來接收消息。以此為基礎(chǔ),SpringCloudAlibabaSpringCloudBusRocketMQRocketMQSpringCloud體系內(nèi)的消息總線,來連接分布式系統(tǒng)的所有節(jié)點(diǎn)。通過SpringCloudStreamRocketMQBinder,RocketMQSpringCloudSpringCloudDataFlow、SpringCloudFuntionRocketMQSpringServerless(SpringCloudStreamRocketMQBinderSpringCloudBusRocketMQSpringCloud最活躍的實(shí)現(xiàn)。四、如何在Spring生態(tài)中選擇RocketMQSpringRocketMQSpringRocketMQ-Spring、SpringCloudStreamRocketMQBinder、SpringCloudBusRocketMQ、SpringDataFlowSpringCloudFunction。它們RocketMQ-生態(tài)用到RocketMQ并且支持SpringMessagingAPIRocketMQJavaSDK適合在SpringBoot中使用RocketMQ望能用到RocketMQjavaSpring和自動(dòng)配置簡化SpringCloudStreamRocketMQ屏蔽底層MQ實(shí)現(xiàn)細(xì)節(jié),上層SpringCloudStream的API是統(tǒng)一的。如果想從Kafka切到RocketMQ,直接改個(gè)配置即可。SpringCloudSpringCloudDataFlow,這上面的流計(jì)算都是SpringCloudStream;SpringCloudBus消息總線內(nèi)部也是用的SpringCloudStre在代碼層面能完全屏蔽底層消息且希望能項(xiàng)目能SpringCloud生態(tài)(SpringCloudDataFlow、SpringCloudFuntcion3.SpringCloudStreamSpringCloudBusRocketMQ將RocketMQ作為事件的“傳輸器”,通過發(fā)Spring希望用RocketMQ做消息總線的用置中心客戶端刷DataFlow以Source/Processor/SinkRocketMQ作為流處理過程中的中間存儲(chǔ)SpringJavaFunctionServerlessRocketMQ作為業(yè)務(wù)消息的首選,在消息和流處理領(lǐng)域被廣泛應(yīng)用。而微服務(wù)生態(tài)Spring框架也是業(yè)務(wù)開發(fā)中最被,兩者的完美契合使得RocketMQSpringMessaing實(shí)現(xiàn)中最受歡迎的消息實(shí)現(xiàn)。書的后半部分講給各位開發(fā)者詳細(xì)講述在SpringRocketMQ RocketMQSpring最初的故事:羅美琪和春波特的故事RocketMQSpringRocketMQSpring最初的故事:羅美琪和春波特的故事 RocketMQSpring最初的故事:羅美琪rocketmq-spring6ApacheRocketMQBoot的方式進(jìn)行了封裝,可以讓用戶通過簡單的annotationSpringMessagingAPISpring社區(qū)的原創(chuàng)人員對我們的代碼進(jìn)行了ReviewslackSpringSpringBootReviewSpringBootRocketMQSpring(SpringBoot)的故事。故事的開始是這樣的,羅美琪美眉有一套RocketMQ的客戶端代碼,負(fù)責(zé)發(fā)送消息SpringBootSpringRocketMQSpringSpringBean,并且相關(guān)屬性要能夠根據(jù)配置文件的配置自動(dòng)設(shè)置,命名它為:RocketMQTemplate,同時(shí)讓它封裝發(fā)@Resourceprivate@ResourceprivateRocketMQTemplateSendResultsendResult=rocketMQTemplate.syncSend(xxxTopic,"Hello,@Service@RocketMQMessageListener(topic=@Service@RocketMQMessageListener(topic="xxx",consumerGroup="xxx_consumer")publicclassStringConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){@RocketMQMessageListener定義消息消費(fèi)的配置參數(shù)(如:消費(fèi)的topic,化,詳見ListenerContainerConfiguration類及其實(shí)現(xiàn)SmartInitializingSingletonAutoConfiguration@ConfigurationRocketMQ需要的SpringBean,如上面所提到的RocketMQTemplate和能夠處理消費(fèi)回調(diào)Listener的容器,每個(gè)Listener對應(yīng)一個(gè)容器SpringBean來啟動(dòng)MQPushConsumer,ListenerRocketMQAutoConfiguration.java(編者注:這個(gè)是最終發(fā)布的類,沒有review)上面定義的Configuration類,它本身并不會(huì)“自動(dòng)”配置,需要由META-INF/層用戶不需要關(guān)心自動(dòng)配置類的細(xì)節(jié)和開關(guān),只要classpathMETA-INFConfigurationConfiguration@EnableConfiguraitonPropertiesConfigurationProperties,可參考RocketMQProperties.java上層用戶可以根據(jù)這個(gè)類里定義的屬性來配置相關(guān)的屬性文件(即META-INF/perties或META-INF/application.yaml)RocketMQSpringBootstarterReview,接/spring-projects/spring-boot/wiki/Building-On-Spring-https://docs.spring.io/spring-boot/docs/current/reference/html/boot--developing-auto-然后解釋道:"在SpringBoot中包含兩個(gè)概念:auto-configuration和POMsauto-configurationSpringBean。它放在用戶的CLASSPATH中結(jié)合在CLASSPATH中的其它依賴就可以提供相關(guān)的功能;mavenprojectPOM文件,不需要包含任classesresources.|---rocketmq-spring-boot- |---rocketmq-spring- auto-configuraiton|---rocketmq-spring- starterpom.xml|---rocketmq-spring- starter"很好,這樣的模塊結(jié)構(gòu)就清晰多了",春波特小哥哥點(diǎn)頭,"但是這個(gè)AutoConfiguration文件里的一些標(biāo)簽的用法并不正確,幫你注釋一下,另外,考慮到Spring8SpringBoot1.X將不再提供支持,所以建議實(shí)現(xiàn)直接支持SpringBoot2.X"@Order~~春波特:OrderruntimeBeanpublicclassRocketMQAutoConfiguration@ConditionalOnClass(DefaultMQProducer.class)~~春波特:要用(nameclasspathroup~~春波特:nameServername-server@Order(1)~~春波特:刪掉 publicDefaultMQProduceriesrocketMQProperties)@ConditionalOnMissingBean(name="rocketMQMessageObjectMapper")~~春波特:議與具體的實(shí)例名綁定,設(shè)計(jì)的意圖是使用系統(tǒng)中已經(jīng)存在的ObjectMapper,如果沒有,則在這里實(shí)publicObjectMapperrocketMQMessageObjectMapper()returnnew@Bean(destroyMethod=@ConditionalOnMissingBean(name="rocketMQTemplate")~~春波特:與上面一樣@Order(2~~春波特:publicRocketMQTemplaterocketMQTemplate(DefaultMQProducer@Autowired(required= ~~春波特:@Qualifier("rocketMQMessageObjectMapper~~春波特:ObjectMapperobjectMapper)RocketMQTemplaterocketMQTemplate=newif(Objects.nonNull(objectMapper))@Bean(name=@Role(BeanDefinition.ROLE_INFRASTRUCTURE)~~春波特:這個(gè)orBeanPostProcessor,(TransactionBeanpublicRocketMQTransactionAnnotationProcessorreturnnew@Configuration~~春波特Configuration@Import在主Configuration@ConditionalOnProperty(prefixspring.rocketmqvaluenameServer~~春波特ame-InitializingBean@Resource~~春波特:annotation,fieldinjectionprivateStandardEnvironment@Autowired(requiredfalse)~~春波特@Qualifier("rocketMQMessageObjectMapper")ObjectMapperobjectMapper){春波特:@Qualifier性名的松散規(guī)則(relaxedrules)。Iftheydon'twebasicallyregisterthepost-processoratthesame"time"asalltheotherbeansinthatclassandthecontractofBPPisthatitmustberegisteredveryearlyon.ThismaynotmakeadifferenceforthisparticularclassbutflaggingitasstaticasthesideeffecttomakeclearyourBPPimplementationisnotsupposedtodragotherbeansviadependencypublicpublicclassListenerContainerConfigurationimplementsApplicationContextAware,SmartprivateObjectMapperobjectMapper=newObjectMapper();~~春波特:性能上考慮,不要RocketMQListenerrocketMQListener=(RocketMQListener)bean;nerannotation=validate(annotation);~~春波特:BeanSpring4.x以考慮使用Spring5.0GenericApplicationContext.registerBean,通過supplierew來構(gòu)造Bean實(shí)例[beanBuilder.addPropertyValue(PROP_NAMESERVER,StringcontainerBeanName=String.format("%s_%s",lass.getName(),DefaultRocketMQListenerContainercontainer=eDefaultRocketMQListenerContainer.class);~~春波特:你這里的啟動(dòng)方法是通過if(!container.isStarted())try}catch(Exceptione)log.error("startedlog.error("startedcontainerfailed.{}",container,thrownew注[3]:使用GenericApplicationContext.registerBeanpublicfinal<T>voidClass<T>beanClass,Supplier<T>supplier,BeanDefinitionCustomizer…SpringBootSpringAssertJavaassertSpringBootAssert:importimportprivateApplicationContextRunnerrunner=new@Test(expected=NoSuchBeanDefinitionException.class)publicrun((context)->{processor注解處理器,這樣它能夠生成輔助元數(shù)據(jù)文件,加快啟動(dòng)時(shí)間。詳情見-autoconfigure/**…*)和單行//...)員變量,方法或者代碼邏輯應(yīng)該提供多行注釋;有些簡單的代碼邏輯注釋也可以使用單;在變量和方法命名時(shí)盡量用詞準(zhǔn)確,并且盡量不要使用縮寫,如:sendMsgTimeout,sendMessageTimeout;包名supportssupport。Lombokconstructor,IDELombokSpringLombokIDE如果一個(gè)包目錄下沒有任何class,建議要去掉這個(gè)包目錄。例如org.apache.rocketmq.spring.starter在spring目錄下沒有具體的class應(yīng)該去掉這層目錄(編者注:我們最終把package改為org.apache.rocketmq.spring,Enumorg.apache.rocketmq.spring.enums下,這個(gè)包命名Enumenumspackageprivate:TransactionHandlerstatic+final一個(gè)類的static方法不要結(jié)合final,除非這個(gè)這個(gè)類本身是final并且聲明private使用@Autowared,@Resource4]注[4]:下面的截圖是有FieldInjection羅美琪根據(jù)上述的要求調(diào)整了代碼,使代碼質(zhì)量有了很大的提高,并且總結(jié)了SpringBootautoconfigurationBean@Conditionalsetter方法來設(shè)置變量,避免使用FieldInjection方式;多個(gè)ConfigurationBean可以使用@ImportSpring20提供的AutoConfigruation注意一些細(xì)節(jié):staticBeanPostProcessor;Lifecycle約束條件,信心滿滿地提交了最終的代碼,又可以邀請RocketMQ社區(qū)的小伙伴們一起RocketMQspringSpringrocketmq-spring-starterSpringInitializr,讓用戶可以直接在/bootstrap.html網(wǎng)站上RocketMQ-Spring畢業(yè)兩周年,為什么能成為RocketMQ-Spring畢業(yè)兩周年,為什么能成為Spring生態(tài)中最受歡迎的messaging實(shí) RocketMQ-SpringRocketMQ-SpringSpring生態(tài)中最受歡迎的messagingRocketMQ-Spring畢業(yè)兩周年,為什么能成為Spring生態(tài)中最受歡迎的messaging實(shí)現(xiàn)2019年1月,孵化6個(gè)月的RocketMQ-Spring作為ApacheRocketMQ的子項(xiàng)目正式畢業(yè),發(fā)布了第一個(gè)Release版本2.0.1。該項(xiàng)目是把RocketMQ的客戶端使用SpringBoot的方式進(jìn)行了封裝,可以讓用戶通過簡單的annotation和標(biāo)準(zhǔn)的SpringMessagingAPI編寫代碼來進(jìn)行消息的發(fā)送和消費(fèi)。當(dāng)時(shí)RocketMQ社區(qū)同學(xué)請Spring社區(qū)的同學(xué)對RocketMQSpring代碼進(jìn)行review,引出一段羅美琪(RocketMQ)和春波特(SpringBoot)的故事。時(shí)隔兩年,RocketMQ-Spring正式發(fā)布2.2.0。在這期間,RocketMQ-Spring迭代了數(shù)個(gè)版本,以RocketMQ-Spring為基礎(chǔ)實(shí)現(xiàn)的SpringCloudStreamRocketMQBinder、SpringCloudBusRocketMQ登上了Spring的官網(wǎng),Spring布道師baeldung向國外同學(xué)介紹如何使用RocketMQ-Spring,越來越多國內(nèi)外的同學(xué)開始使用RocketMQ-Spring收發(fā)消息,RocketMQ-Spring倉庫的star數(shù)也在短短兩年時(shí)間內(nèi)超越了Spring-Kafka和Spring-AMQP(注:兩者均由Spring社區(qū)維護(hù)),成為ApacheRocketMQ最受歡迎的生態(tài)項(xiàng)目之一。RocketMQ-Spring的受歡迎一方面得益于支持豐富業(yè)務(wù)場景的RocketMQ與微服務(wù)生態(tài)Spring的完美契合,另一方面也與RocketMQ-Spring本身嚴(yán)格遵循SpringMessagingAPISpringMessagingAPISpringMessagingAPI對消息發(fā)送端和消息接收端的模式進(jìn)行規(guī)定,不同的消息中間件提供商可以在這個(gè)模式下提供自己的Spring實(shí)現(xiàn):在消息發(fā)送端需要實(shí)現(xiàn)的是一個(gè)XXXTemplate形式的JavaBean,結(jié)合SpringBoot的自動(dòng)接口(實(shí)現(xiàn)方式通常會(huì)使用一個(gè)注解來聲明一個(gè)消息驅(qū)動(dòng)的POJO),提供回調(diào)方法來SpringBoot的自動(dòng)化選項(xiàng)和一些定制化的屬RocketMQ-SpringSpringMessagingAPIRocketMQ自身的功能特點(diǎn)提供了相應(yīng)的API。在消息的發(fā)送端,RocketMQ-Spring通過實(shí)現(xiàn)RocketMQTemplate完成消息的發(fā)送。如下圖所示,RocketMQTemplate繼承AbstractMessageSendingTemplate抽象類,來支持SpringMessagingAPI標(biāo)準(zhǔn)的消doSenddoSendsyncSend,由DefaultMQProducer實(shí)現(xiàn)。除SpringMessagingAPI規(guī)范中的方法,RocketMQTemplate還實(shí)現(xiàn)了RocketMQ于原生客戶端需要自己去構(gòu)建RocketMQMessage(比如將對象序列化成byte數(shù)組放入Message對象),RocketMQTemplate可以直接將對象、字符串或者byte數(shù)組作為參數(shù)發(fā)送出去(對象序列化操作由RocketMQ-Spring內(nèi)置完成),在消費(fèi)端約定好對應(yīng)的Schema即可正常收發(fā)。RocketMQTemplateRocketMQTemplateSendvoidasyncSend(Stringdestination,Message<?>message,SendCallbacksendCallback)voidasyncSend(Stringdestination,Message<?>message,SendCallbacksendCallback)在消費(fèi)端,需要實(shí)現(xiàn)一個(gè)包含@RocketMQMessageListener注解的類(需要實(shí)現(xiàn)RocketMQListener接口,并實(shí)現(xiàn)onMessage方法,在注解中進(jìn)行topic、consumerGroup等屬性配置),這個(gè)Listener會(huì)一對一的被放置到DefaultRocketMQListenerContainer容器對象中,容器對象會(huì)根據(jù)消費(fèi)的方式(并發(fā)或順序),將RocketMQListener封裝到具體的RocketMQ內(nèi)部的并發(fā)或者順序接口實(shí)現(xiàn)。在容器中創(chuàng)建RocketMQDefaultPushConsumer對象,啟動(dòng)并監(jiān)聽定制的Topic消息,完成約定Schema對象的轉(zhuǎn)換,回調(diào)到Listener的onMessage方法。@RocketMQMessageListener(topic="${demo.rocketmq.topic}",consumerGroup=merimplementsRocketMQListener<String>publicvoidonMessage(Stringmessage)System.out.printf("-------StringConsumerreceived:%s\n",除此Push接口之外,在最新的2.2.0版本中,RocketMQ-Spring實(shí)現(xiàn)了RocketMQLitePullConsumer。通過在配置文件中進(jìn)行consumer的配置,利用RocketMQTemplate的Recevie方法即可主動(dòng)Pull消息。配置文件配置文件resource/perties:PullConsumerwhile(!isStop)RocketMQSpring消息類型支持方面與RocketMQ原生客戶端完全對齊,包括同步/異步/one-way、順序、延遲、批量、事務(wù)以及Request-Reply消息。在這里,主要介紹較為特殊的事務(wù)消息和request-reply消息。RocketMQ的事務(wù)消息不同于SpringMessaging中的事務(wù)消息,依然采用RocketMQ原生事務(wù)消息的方案。如下所示,發(fā)送事務(wù)消息時(shí)需要實(shí)現(xiàn)一個(gè)包含@RocketMQTransactionListener注解的類,并實(shí)現(xiàn)executeLocalTransaction和checkLocalTransaction方法,從而來完成執(zhí)行本地事務(wù)以及檢查本地事務(wù)執(zhí)行結(jié)果。////BuildaSpringMessageforsendingintransactionMessagemsg=//mustbesamewiththe@RocketMQTransactionListener'smemberfield'transName'//DefinetransactionlistenerwiththeannotationpublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,ObjectreturnRocketMQLocalTransactionState.UNKNOWN;publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg)//...checktransactionstatusandreturnbollback,commitorunknownreturnRocketMQLocalTransactionState.COMMIT;2.1.0RocketMQ-Spring中每一個(gè)group對應(yīng)一個(gè)TransactionProducer,而在新版本中改為每一個(gè)RocketMQTemplate對應(yīng)一個(gè)TransationProducer,從而解決了并發(fā)使用多個(gè)事務(wù)消息的問題。當(dāng)用戶需要在單進(jìn)程使用多個(gè)事務(wù)消息時(shí),可以使用ExtRocketMQTemplate來完成(一般情況下,推薦一個(gè)進(jìn)程使用一個(gè)RocketMQTemplate,ExtRocketMQTemplate可以使用在同進(jìn)程中需要使用多個(gè)Producer/LitePullConsumer的場景,可以為ExtRocketMQTemplate指定與標(biāo)準(zhǔn)模版RocketMQTemplate不同的nameserver、group等配置),并在對應(yīng)的RocketMQTransactionListener注解中指定rocketMQTemplateBeanName為ExtRocketMQTemplate的BeanName。Request-Reply在2.1.0版本中,RocketMQ-Spring開始支持Request-Reply消息。Request-Reply消息指的是上游服務(wù)投遞消息后進(jìn)入等待被通知的狀態(tài),直到消費(fèi)端返回結(jié)果并返回給發(fā)送端。在RocketMQ-Spring中,發(fā)送端通過RocketMQTemplate的sendAndReceivce方法進(jìn)行發(fā)送,如下所示,主要有同步和異步兩種方式。異步方式中通過實(shí)現(xiàn)RocketMQLocalRequestCallback進(jìn)行回調(diào)。//異步發(fā)送request并且等待UserrocketMQTemplate.sendAndReceive("objectRequestTopic",newUser("requestUserName",(byte)9),newRocketMQLocalRequestCallback<User>(){@OverridepublicvoidonSuccess(Usermessage){@OverridepublicvoidonException(Throwablee)在消費(fèi)端,仍然需要實(shí)現(xiàn)一個(gè)包含@RocketMQMessageListener注解的類,但需RocketMQReplyListener<T,R(<T>接口),其中T表示接收值的類型,R表示返回值的類型,接口需要實(shí)現(xiàn)帶返回onMessageProducer。@RocketMQMessageListener(topic="stringRequestTopic",consumerGroup="stringRequepublicclassStringConsumerWithReplyStringimplementsRocketMQReplyListener<String,String>{publicStringonMessage(Stringmessage)return"replyRocketMQ-Spring遵循Spring約定大于配置(Conventionoverconfiguration)的理念,通過啟動(dòng)器(SpringBootStarter)的方式,在pom文件引入依賴(SpringBoot中集成所有RocketMQ客戶端的所有功能,通過簡單的注解使用即可完RocketMQ-SpringGithubWiki中有更加詳細(xì)的用法和常見問題解據(jù)統(tǒng)計(jì),從RocketMQ-Spring發(fā)布第一個(gè)正式版本以來,RocketMQ-Spring完成16個(gè)bug修復(fù),37個(gè)imporvement,其中包括事務(wù)消息重構(gòu),消息過濾、消息序列化、多實(shí)例RocketMQTemplate優(yōu)化等重要優(yōu)化,歡迎更多的小伙伴能參與到RocketMQ社區(qū)的建設(shè)中來,羅美琪(RocketMQ)和春波特(SpringBoot)的故事還 方法一:使用rocketmq-spring-boot-starter來配置、發(fā)送和消費(fèi)RocketMQRocketMQ作者簡介:遼天,阿里巴巴技術(shù)專家,ApacheRocketMQ內(nèi)核控,擁有多年分布注RocketMQ內(nèi)核優(yōu)化以及Messaging生態(tài)建設(shè)。JavaBeansXMLJava對象(PlainOldJavaObjects)的Spring技術(shù)應(yīng)運(yùn)而生,依賴注入(DependencyInjection),控制反轉(zhuǎn)(隨著pringAnnotaton)ML2014年4月1日,SpringBoot1.0.0正式發(fā)布,它基于“約定大于配置”(Conventionoveronfiguration)pring應(yīng)用,并能通過簡單地與各種啟動(dòng)器如spring-boot-web-starter)結(jié)合,讓應(yīng)用直接以命ApacheRocketMQBrokerPoducer)BoerConume)Boer為了利用SpringBootRocketMQ端,ApacheRocketMQspring-boot-starterRocketMQ順便在這里討論一下在Spring中關(guān)于消息的兩個(gè)主要的框架,即Springesagng和pringCoudtreamprngBootSpringSpringMessagingSpringFramework4SpringJmsTemplate的簡單的使用JMS步接收消息的一整套完整的基礎(chǔ)架構(gòu),SpringAMQPSpringBoot單純對于客戶端而言,SpringMessagingAPISpringXXXTemplateJavaBeanSpringBootXXXMessageListener(實(shí)現(xiàn)方式通常會(huì)使用一個(gè)注解來聲明一個(gè)POJO)SpringBootSpringMessaging及針對不同的消息產(chǎn)品的使用,推薦閱SpringMessagingRocketMQspring-boot-starterRocketMQAPISpringCloudSpringCloudStreamSpringIntegration該圖片引自springcloudSpringCloudStream@Input)和輸編程,而不需要關(guān)心運(yùn)行時(shí)具體的Binder綁定的消息中間件。在運(yùn)行時(shí),SpringCloudStreamclasspathBinder。Binder。在更加復(fù)雜的使用場景中,也可以在應(yīng)用中打包多個(gè)BinderBinderBinder。BinderSpringCloudStream應(yīng)用可以靈活的連接到中間件,加之SpringCloudStreamSpringBootSpringBoo(包括應(yīng)用啟動(dòng)參數(shù)、環(huán)境變量和application.yml或者perties文件),部署人員可以在運(yùn)行時(shí)動(dòng)destination(例如,KafkatopicRabbitMQexchange)。SpringCloudSteamRocketMQBinder,我們計(jì)PR或三、spring-boot-starterSpringspring-boot-starter封裝給開發(fā)者,讓開發(fā)者非常方便集成和使用,這里我們詳細(xì)的介紹一下RocketMQ(客戶端)的starterspring-boot-starterpom.xml<artifactId>spring-boot-starter-它分為兩個(gè)部分:A、SpringB、RocketMQRocketMQPropertiesBeanspringboot更具文中中所指定的自動(dòng)化配置類來自動(dòng)初始化相關(guān)的BeanComponentService,它的內(nèi)容如下:在RocketMQAutoConfiguration類的具體實(shí)現(xiàn)中,定義開放給用戶直接使用的Bean.包括:RocketMQPropertiesRocketMQTemplateListenerContainerConfigurationBean類,這個(gè)類要求:由@RocketMQMessageListener注解標(biāo)注;實(shí)現(xiàn)RocketMQListener最后具體的RocketMQSpringMessagingRocketMQTemplatePOJOSpringMessaging的發(fā)送模板兼容,在RocketMQTemplateAbstractMessageSendingTemplate抽象類,來支持相關(guān)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會(huì)代理給doSend()方法doSend()RocoketMQ向和順序等方法直接添加到RoketMQTempalte中,這些方法直接代理調(diào)用到RocketMQProducerAPIRocketMQTemplate里加入了一個(gè)發(fā)送事務(wù)消息的方法sendMessageInTransactioProducerTransactionListener實(shí)現(xiàn)類,以便在發(fā)送消息后能夠?qū)ransactionListener在消費(fèi)端Spring-Boot應(yīng)用啟動(dòng)后,會(huì)掃描所有包含@RocketMQMessageListenerRocketMQListener接口,并實(shí)現(xiàn)onMessage()方法),這個(gè)ListenerDefaultRocketMQListenerContainer或順序)RocketMQListenerRocketMQRocketMQConsumerTopicListeneronMessage()方法。RocketMQ服務(wù)端的準(zhǔn)備NameServerRocketMQSpring-BootRocketMQ并啟動(dòng)。可以參考RocketMQ主站的快速開始來進(jìn)行操作。確保啟動(dòng)NameServer和Brokerbashbashbin/mqadminupdateTopic-cDefaultCluster-tstring-gitmvncleaninstallmvncleanmavenpom.xmlJavaJava如果需要了解更多的調(diào)用方式,如:異步發(fā)送,對象消息體,指定tag標(biāo)簽以及指定事 方法二:SpringCloudStream體系及PhotobyMedBadrChemmaouionSpringCloudStream在SpringCloud體系內(nèi)用于構(gòu)建高度可擴(kuò)展的基于事件驅(qū)動(dòng)的微服務(wù),其目的是為了簡化消息在SpringCloud應(yīng)用程序中的開發(fā)。SpringCloudStream(后面以SCS代替SpringCloudStream)本身內(nèi)容很多,而且它還有很多外部的依賴,想要熟悉SCS,必須要先了解SpringMessagingSpringIntegration這兩個(gè)項(xiàng)目,接下來,文章將從圍繞以下三點(diǎn)進(jìn)行展開:SpringSpring什么是SCS一、SpringSpringMessaging是SpringFramework中的一個(gè)模塊,其作用就是統(tǒng)一消息的 對應(yīng)的模型就包括一個(gè)消息體Payload和消息頭packagepackageorg.springframework.messaging;publicinterfaceMessage<T>{TgetPayload();MessageHeadersgetHeaders();消息通 用于接收消息,調(diào)用send方法可以將消息發(fā)送至publicinterfaceMessageChannel{longINDEFINITE_TIMEOUT=-defaultbooleansend(Message<?>message){returnsend(message,INDEFINITE_TIMEOUT);booleansend(Message<?>message,longSubscribableChannel由消息通道的子接口可訂閱的消息通 實(shí)現(xiàn),SubscribableChannel消息處理器所訂閱publicpublicinterfaceSubscribableChannelextendsMessageChannel{booleansubscribe(MessageHandlerhandler);booleanunsubscribe(MessageHandler 真正地消費(fèi)/處理消息voidhandleMessage(Message<?>message)throwsSpringMessaging消息接收參數(shù)及返回值處理:消息接收參數(shù)處理器@Payload@Headeresolver配 等注解使用;消息接收后的返回值處理@Payload@Headeresolver 二、SpringSpringIntegrationSpring編程模型的擴(kuò)展用來支持企業(yè)集成模式(EnterpriseIntegrationPatternsSpringMessagingMessaMessageRout、消息過濾Filter、消息轉(zhuǎn)

PublishSubscribeChannelExecutorChannelDirectChannelPublishSubscribeChannelExecutorChannelDirectChannel SubscribableChannelmessageChannel=newDirectChannel();//1messageChannel.subscribe(msg->{//2System.out.println("receive:SubscribableChannelmessageChannel=newDirectChannel();//1messageChannel.subscribe(msg->{//2System.out.println("receive:"+msg.getPayload()); 發(fā)送一條消息到這個(gè)消息通道,消息最終被消息通道里 最后控制臺(tái)打印出receivemsgfrom 的消息通 單播的分發(fā)器,只能選擇一個(gè)消息通道。那么如何選擇呢?內(nèi)部提供了 SubscribableChannelmessageChannel=newDirectChannel();messageChannel.subscribe(msg->{SubscribableChannelmessageChannel=newDirectChannel();messageChannel.subscribe(msg->{messageChannel.subscribe(msg->{System.out.println("receive2:"+msg.getPayload());messageChannel.send(MessageBuilder.withPayload("msgfrom receive1:receive1:msgfromreceive2:msgfrom SubscribableChannelmessageChannel=newPublishSubscribeChannel();messageChannel.subscribe(msg->{SubscribableChannelmessageChannel=newPublishSubscribeChannel();messageChannel.subscribe(msg->{System.out.println("receive1:"+messageChannel.subscribe(msg->{System.out.println("receive2:"+msg.getPayload());messageChannel.send(MessageBuilder.withPayload("msgfrom receive1:receive1:msgfromalibabareceive2:msgfromalibabareceive1:msgfromreceive2:msgfrom三、SpringCloudSCSSCS在SpringIntegration的基礎(chǔ)上進(jìn)行了封裝,提出了Binder,等概念/bindings,SCS與SpringBootActuator整合,提供/bindings,SCS與SpringBootExternalizedConfiguration整合提供 等外部化配置類SCSSCSSpringIntegrationSpringBootSpringCloudBus的基礎(chǔ)。它屏蔽了底層消息中間件的實(shí)現(xiàn)細(xì)節(jié),希望以統(tǒng)一的一套API來進(jìn)行消息的發(fā)送/消費(fèi),底層消息中間件的實(shí)現(xiàn)細(xì)節(jié)由各消息中間件的Binder完成。Binding2 RabbitBinder和KafkaBinder,SpringCloudAlibaba內(nèi)部已經(jīng)實(shí)現(xiàn)了RocketMQBinder。從圖中可以看出,Binding是連接應(yīng)用程序跟消息中間件的橋梁,用于消息的消費(fèi)和生產(chǎn)。我們來看一個(gè)最簡單的使用RocketMQBinder的例子,然后分析一下它的底publicclassSendAndReceiveApplication{SpringApplication.run(SendAndReceiveApplication.class,args);@Bean//publicCustomRunnercustomRunner(){returnnewCustomRunner();publicstaticclassCustomRunnerimplementsCommandLineRunner{intcount=5;for(intindex=1;index<=count;index++){@StreamListener(Sink.INPUT)//4publicvoidreceiveByStreamListener1(StringreceiveMsg){這段代碼很簡單,沒有涉及到RocketMQ相關(guān)的代碼,消息的發(fā)送和接收都是基于SCS體系完成的。如果想切換成RabbitMQ或Kafka,只需修改配置文件即可,代碼我們來分析下這段代碼的原理

是SCS內(nèi)部提供的。SCS

outputinput方法返回的MessageChannel output和input方法修飾注解對應(yīng)的value是配置文件中binding的namepublicinterfaceSource{StringOUTPUT="output";publicinterfaceSource{StringOUTPUT="output";StringINPUT="input";配置文件里bindings的name為output和input,對 spring.cloud.stream.bindings.input.group=test-構(gòu)造CommandLineRunner,程序啟動(dòng)的時(shí)候會(huì)執(zhí) Source里的output發(fā)送消息 消息通道之后會(huì)被 MessageHandler不同的消息中間件對應(yīng)的MessageHandlerSpringMessageMessagebroker; inputbindingnameinput不同的消息中間件對應(yīng)的方法會(huì)使用Consumer訂閱消息,訂閱到消息后內(nèi)部會(huì)把中間件對應(yīng)的Message模型轉(zhuǎn)換成SpringMessage;消息轉(zhuǎn)換之后會(huì)把SpringMessage發(fā)送至name為input對應(yīng) 訂閱了nameBinder實(shí)現(xiàn)以及MQ基本的訂閱發(fā)布功能):SCS章節(jié)的最后,我們來看一段SCSpublicvoidreceiveByHeader(Messagemsg){System.out.println("receivebyheaders['index']=='1':"+publicvoidreceivePerson(@PayloadPersonperson){@StreamListener(value=Sink.INPUT)@StreamListener(value=publicvoidreceiveHeaderAndMsg(@Header("index")Stringindex,Messagemsg){System.out.println("receivebyHeaderAndMsgbyStreamListener.content:"+msg);有沒有發(fā)現(xiàn)這段代碼跟SpringMVCController中接收請求的代碼很像?實(shí)際上SpringMessaging上圖是SCS體系相關(guān)類說明的總結(jié),關(guān)于SCS以及RocketMQBinder更多相RocketMQBinderDemos(Demos),包含了消息的聚合、分割、過濾;消息異常處理;消息標(biāo)簽、SQLSpringCloudBusSpringCloud作用,并逐步展開,分析SpringCloudAlibaba中的RocketMQBinder是如何實(shí)現(xiàn)SpringCloudStream標(biāo)準(zhǔn)的。SpringCloudAlibaba方法三:SpringCloud方法三:SpringCloudBus消息總線介 方法三:Spring方法三:SpringCloudBus方法三:SpringCloudBus消息總線介本期我們來了解下SpringCloud體系中的另外一個(gè)組件SpringCloudBus(建議先熟悉SpringCloudStream,不然無法理解SpringCloudBus內(nèi)部的代碼)。SpringCloudBus對自己的定位是SpringCloud體系內(nèi)的消息總線,使用messagebroker來連接分布式系統(tǒng)的所有節(jié)點(diǎn)。Bus官方的Reference文檔比較簡SpringCloudBus一、Bus實(shí)例演示在分析Bus的實(shí)現(xiàn)之前,我們先來看兩個(gè)使用SpringCloudBusBus的例子比較簡單,因?yàn)锽us的AutoConfiguration層都有了默認(rèn)的配置,只需要引入消息中間件對應(yīng)的SpringCloudStream以及SpringCloudBus依賴即可,之后所有啟動(dòng)的應(yīng)用都會(huì)使用同一個(gè)Topic進(jìn)行消息的接收和發(fā)送。BusDemogithub(/fangjian0423rocketmqbinderdemotreemasterrocketmqbusdemo)Demo會(huì)模擬啟動(dòng)5個(gè)節(jié)點(diǎn),只需要對其中任意的一個(gè)實(shí)例新增配置項(xiàng),所有節(jié)點(diǎn)都curlcurl-XGET所有節(jié)點(diǎn)返回的結(jié)果都是unknown,因?yàn)樗泄?jié)點(diǎn)的配置中沒 keyEnvironmentBusEndpointBus內(nèi)部提供 這個(gè)Endpoint通過EnvironmentBusEndpoint對應(yīng)的:/進(jìn)行配置項(xiàng)的新增(比如訪問node1'content-type: $curl-XPOST'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba'keyhangzhouvaluealibaba。這個(gè)配置項(xiàng)是通過Bus提供 DD畫的一張圖片,SpringCloudConfig配合Bus完成所有節(jié)比如在node1上指定destination為rocketmq-bus-node2(node2配置了為 獲取配置(由于在node1上發(fā)送消息,Bus也會(huì)對發(fā)送方的節(jié)node1進(jìn)行配置修改):$$curl-XPOST'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hang可以看到,只有node1和node2修改了配置,其余的3二、Bus的實(shí)現(xiàn)1BusBusRemoteApplicationEventSpringEnvironmentChangeRemoteApplicationEvent:一個(gè)Map類型的數(shù)據(jù)并更新到Spring上下文 中的事件。文AckRemoteApplicationEvent:遠(yuǎn)程確認(rèn)事件。Bus UnknownRemoteApplicationEvent:遠(yuǎn)程未知事件。Bus內(nèi)部消息體進(jìn)行轉(zhuǎn)換遠(yuǎn)-Bus內(nèi)部還存在一個(gè) -Trace 進(jìn)行操作,比如 implementsApplicationListener<EnvironmentChangeRemoteApplicationEvent>{

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論