下載本文檔
版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
【移動應用開發(fā)技術】SpringBoot整合RocketMQ遇到的坑怎么解決
本篇內容主要講解“SpringBoot整合RocketMQ遇到的坑怎么解決”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓在下來帶大家學習“SpringBoot整合RocketMQ遇到的坑怎么解決”吧!在實現(xiàn)RocketMQ消費時,一般會用到@RocketMQMessageListener注解定義Group、Topic以及selectorExpression(數(shù)據(jù)過濾、選擇的規(guī)則)為了能支持動態(tài)篩選數(shù)據(jù),一般都會使用表達式,然后通過apollo或者cloudconfig進行動態(tài)切換。
<!--
RocketMq
Spring
Boot
Starter-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>@RocketMQMessageListener(consumerGroup
=
"${rocketmq.group}",topic
="${rocketmq.topic}",selectorExpression
=
"${rocketmq.selectorExpression}")
public
class
Consumer
implements
RocketMQListener<String>
{
@Override
public
void
onMessage(String
s)
{
System.out.println("消費到的數(shù)據(jù)為:"+s);
}
}RocketMQMessageListener整個注解默認selectorExpression為*,表示接收當前Topic下的所有數(shù)據(jù),如果我們想對tags進行動態(tài)配置,在使用${rocketmq.selectorExpression}表達式時會發(fā)現(xiàn)所有數(shù)據(jù)全被過濾了,跟蹤源碼(ListenerContainerConfiguration.java)發(fā)現(xiàn)在創(chuàng)建listener時selectorExpression的數(shù)據(jù)在通environment環(huán)境變量中獲取對應的數(shù)據(jù)后又被覆蓋了,導致整個過濾條件被變更為表達式。@Override
public
void
afterSingletonsInstantiated()
{
//
獲取所有所有使用了RocketMQMessageListener注解的bean
Map<String,
Object>
beans
=
this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
if
(Objects.nonNull(beans))
{
//
循環(huán)注冊容器
beans.forEach(this::registerContainer);
}
}
private
void
registerContainer(String
beanName,
Object
bean)
{
Class<?>
clazz
=
AopProxyUtils.ultimateTargetClass(bean);
//
校驗當前bean是否實現(xiàn)了RocketMQListener接口
if
(!RocketMQListener.class.isAssignableFrom(bean.getClass()))
{
throw
new
IllegalStateException(clazz
+
"
is
not
instance
of
"
+
RocketMQListener.class.getName());
}
//
獲取bean上的annotation
RocketMQMessageListener
annotation
=
clazz.getAnnotation(RocketMQMessageListener.class);
//
解析group及topic,可支持表達式
String
consumerGroup
=
this.environment.resolvePlaceholders(annotation.consumerGroup());
String
topic
=
this.environment.resolvePlaceholders(annotation.topic());
boolean
listenerEnabled
=
(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup,
Collections.EMPTY_MAP)
.getOrDefault(topic,
true);
if
(!listenerEnabled)
{
log.debug(
"Consumer
Listener
(group:{},topic:{})
is
not
enabled
by
configuration,
will
ignore
initialization.",
consumerGroup,
topic);
return;
}
validate(annotation);
String
containerBeanName
=
String.format("%s_%s",
DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext
genericApplicationContext
=
(GenericApplicationContext)applicationContext;
//
注冊bean的,調用createRocketMQListenerContainer
genericApplicationContext.registerBean(containerBeanName,
DefaultRocketMQListenerContainer.class,
()
->
createRocketMQListenerContainer(containerBeanName,
bean,
annotation));
DefaultRocketMQListenerContainer
container
=
genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
if
(!container.isRunning())
{
try
{
container.start();
}
catch
(Exception
e)
{
log.error("Started
container
failed.
{}",
container,
e);
throw
new
RuntimeException(e);
}
}
("Register
the
listener
to
container,
listenerBeanName:{},
containerBeanName:{}",
beanName,
containerBeanName);
}
private
DefaultRocketMQListenerContainer
createRocketMQListenerContainer(String
name,
Object
bean,
RocketMQMessageListener
annotation)
{
DefaultRocketMQListenerContainer
container
=
new
DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
String
nameServer
=
environment.resolvePlaceholders(Server());
nameServer
=
StringUtils.isEmpty(nameServer)
?
rocketMQProperties.getNameServer()
:
nameServer;
String
accessChannel
=
environment.resolvePlaceholders(annotation.accessChannel());
container.setNameServer(nameServer);
if
(!StringUtils.isEmpty(accessChannel))
{
container.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
container.setTopic(environment.resolvePlaceholders(annotation.topic()));
//
此處已經(jīng)根據(jù)表達式將數(shù)據(jù)取出
String
tags
=
environment.resolvePlaceholders(annotation.selectorExpression());
if
(!StringUtils.isEmpty(tags))
{
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
//
此處將SelectorExpression的數(shù)據(jù)覆蓋成了表達式
container.setRocketMQMessageListener(annotation);
container.setRocketMQListener((RocketMQListener)bean);
container.setObjectMapper(objectMapper);
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
container.setName(name);
//
REVIEW
ME,
use
the
same
clientId
or
multiple?
return
container;
}因為ListenerContainerConfiguration類是實現(xiàn)了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我們可以通過反射對selectorExpression的數(shù)據(jù)在ListenerContainerConfiguration進行初始化前進行解析并賦值回去。/**
*
在springboot初始化后,RocketMQ容器初始化前利用反射動態(tài)改變數(shù)據(jù)
**/
@Configuration
public
class
ChangeSelectorExpressionBeforeMQInit
implements
InitializingBean
{
@Autowired
private
ApplicationContext
applicationContext;
@Autowired
private
StandardEnvironment
environment;
@Override
public
void
afterPropertiesSet()
throws
Exception
{
Map<String,Object>
beans
=applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
for
(Object
bean
:
beans.values()){
Class<?>
clazz
=
AopProxyUtils.ultimateTargetClass(bean);
if
(!RocketMQListener.class.isAssignableFrom(bean.getClass()))
{
continue;
}
RocketMQMessageListener
annotation
=
clazz.getAnnotation(RocketMQMess
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年行政行為法律文書電子簽名服務合同2篇
- 2025年銷售人員勞動合同示范文本2篇
- 2025年鮮蛋品牌孵化與市場推廣合作協(xié)議3篇
- 鄭州經(jīng)貿(mào)學院《教師教育綜合》2023-2024學年第一學期期末試卷
- 鄭州工業(yè)安全職業(yè)學院《建筑一般構造》2023-2024學年第一學期期末試卷
- 2025年度餐館員工食品安全責任合同范本3篇
- 鄭州電力高等??茖W?!度宋锂嫞?)》2023-2024學年第一學期期末試卷
- 個人健身教練合同:2024版專業(yè)輔導合同書版B版
- 2025年食堂節(jié)能環(huán)保設施改造承包協(xié)議9篇
- 鄭州財稅金融職業(yè)學院《運輸技術實務》2023-2024學年第一學期期末試卷
- 檢驗員績效考核
- 農(nóng)藥合成研發(fā)項目流程
- 機電安裝工程安全管理
- 2024年上海市第二十七屆初中物理競賽初賽試題及答案
- 信息技術部年終述職報告總結
- 理光投影機pj k360功能介紹
- 六年級數(shù)學上冊100道口算題(全冊完整版)
- 八年級數(shù)學下冊《第十九章 一次函數(shù)》單元檢測卷帶答案-人教版
- 帕薩特B5維修手冊及帕薩特B5全車電路圖
- 小學五年級解方程應用題6
- 年月江西省南昌市某綜合樓工程造價指標及
評論
0/150
提交評論