【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決_第1頁
【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決_第2頁
【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決_第3頁
【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決_第4頁
【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決_第5頁
免費預覽已結束,剩余1頁可下載查看

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決

本篇內容主要講解“SpringBoot整合RocketMQ遇到的坑怎么解決”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓在下來帶大家學習“SpringBoot整合RocketMQ遇到的坑怎么解決”吧!在實現(xiàn)RocketMQ消費時,一般會用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過濾、選擇的規(guī)則)為了能支持動態(tài)篩選數(shù)據(jù),一般都會使用表達式,然后通過apollo或者cloudconfig進行動態(tài)切換。

<!--

RocketMq

Spring

Boot

Starter-->

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-spring-boot-starter</artifactId>

<version>2.0.4</version>

</dependency>@RocketMQMessageListener(consumerGroup

=

"${rocketmq.group}",topic

="${rocketmq.topic}",selectorExpression

=

"${rocketmq.selectorExpression}")

public

class

Consumer

implements

RocketMQListener<String>

{

@Override

public

void

onMessage(String

s)

{

System.out.println("消費到的數(shù)據(jù)為:"+s);

}

}RocketMQMessageListener整個注解默認selectorExpression為*,表示接收當前Topic下的所有數(shù)據(jù),如果我們想對tags進行動態(tài)配置,在使用${rocketmq.selectorExpression}表達式時會發(fā)現(xiàn)所有數(shù)據(jù)全被過濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對應的數(shù)據(jù)后又被覆蓋了,導致整個過濾條件被變更為表達式。@Override

public

void

afterSingletonsInstantiated()

{

//

獲取所有所有使用了RocketMQMessageListener注解的bean

Map<String,

Object>

beans

=

this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

if

(Objects.nonNull(beans))

{

//

循環(huán)注冊容器

beans.forEach(this::registerContainer);

}

}

private

void

registerContainer(String

beanName,

Object

bean)

{

Class<?>

clazz

=

AopProxyUtils.ultimateTargetClass(bean);

//

校驗當前bean是否實現(xiàn)了RocketMQListener接口

if

(!RocketMQListener.class.isAssignableFrom(bean.getClass()))

{

throw

new

IllegalStateException(clazz

+

"

is

not

instance

of

"

+

RocketMQListener.class.getName());

}

//

獲取bean上的annotation

RocketMQMessageListener

annotation

=

clazz.getAnnotation(RocketMQMessageListener.class);

//

解析group及topic,可支持表達式

String

consumerGroup

=

this.environment.resolvePlaceholders(annotation.consumerGroup());

String

topic

=

this.environment.resolvePlaceholders(annotation.topic());

boolean

listenerEnabled

=

(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup,

Collections.EMPTY_MAP)

.getOrDefault(topic,

true);

if

(!listenerEnabled)

{

log.debug(

"Consumer

Listener

(group:{},topic:{})

is

not

enabled

by

configuration,

will

ignore

initialization.",

consumerGroup,

topic);

return;

}

validate(annotation);

String

containerBeanName

=

String.format("%s_%s",

DefaultRocketMQListenerContainer.class.getName(),

counter.incrementAndGet());

GenericApplicationContext

genericApplicationContext

=

(GenericApplicationContext)applicationContext;

//

注冊bean的,調用createRocketMQListenerContainer

genericApplicationContext.registerBean(containerBeanName,

DefaultRocketMQListenerContainer.class,

()

->

createRocketMQListenerContainer(containerBeanName,

bean,

annotation));

DefaultRocketMQListenerContainer

container

=

genericApplicationContext.getBean(containerBeanName,

DefaultRocketMQListenerContainer.class);

if

(!container.isRunning())

{

try

{

container.start();

}

catch

(Exception

e)

{

log.error("Started

container

failed.

{}",

container,

e);

throw

new

RuntimeException(e);

}

}

("Register

the

listener

to

container,

listenerBeanName:{},

containerBeanName:{}",

beanName,

containerBeanName);

}

private

DefaultRocketMQListenerContainer

createRocketMQListenerContainer(String

name,

Object

bean,

RocketMQMessageListener

annotation)

{

DefaultRocketMQListenerContainer

container

=

new

DefaultRocketMQListenerContainer();

container.setRocketMQMessageListener(annotation);

String

nameServer

=

environment.resolvePlaceholders(Server());

nameServer

=

StringUtils.isEmpty(nameServer)

?

rocketMQProperties.getNameServer()

:

nameServer;

String

accessChannel

=

environment.resolvePlaceholders(annotation.accessChannel());

container.setNameServer(nameServer);

if

(!StringUtils.isEmpty(accessChannel))

{

container.setAccessChannel(AccessChannel.valueOf(accessChannel));

}

container.setTopic(environment.resolvePlaceholders(annotation.topic()));

//

此處已經(jīng)根據(jù)表達式將數(shù)據(jù)取出

String

tags

=

environment.resolvePlaceholders(annotation.selectorExpression());

if

(!StringUtils.isEmpty(tags))

{

container.setSelectorExpression(tags);

}

container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));

//

此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達式

container.setRocketMQMessageListener(annotation);

container.setRocketMQListener((RocketMQListener)bean);

container.setObjectMapper(objectMapper);

container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());

container.setName(name);

//

REVIEW

ME,

use

the

same

clientId

or

multiple?

return

container;

}因為ListenerContainerConfiguration類是實現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過反射對selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進行初始化前進行解析并賦值回去。/**

*

在springboot初始化后,RocketMQ容器初始化前利用反射動態(tài)改變數(shù)據(jù)

**/

@Configuration

public

class

ChangeSelectorExpressionBeforeMQInit

implements

InitializingBean

{

@Autowired

private

ApplicationContext

applicationContext;

@Autowired

private

StandardEnvironment

environment;

@Override

public

void

afterPropertiesSet()

throws

Exception

{

Map<String,Object>

beans

=applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

for

(Object

bean

:

beans.values()){

Class<?>

clazz

=

AopProxyUtils.ultimateTargetClass(bean);

if

(!RocketMQListener.class.isAssignableFrom(bean.getClass()))

{

continue;

}

RocketMQMessageListener

annotation

=

clazz.getAnnotation(RocketMQMess

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論