版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、消息隊(duì)列TCP 接入(專業(yè))消息隊(duì)列/TCP消息隊(duì)列/TCP PAGE 31 PAGE 31TCP 接入(專業(yè))Java SDK環(huán)境準(zhǔn)備運(yùn)行本節(jié)描述的 Java 代碼之前,請(qǐng)按以下說(shuō)明準(zhǔn)備好環(huán)境。通過(guò)下面兩種方式可以引入依賴(任選一種):- Maven 方式引入依賴:com.aliyun.openservicesons-client1.2.4下載依賴 Jar 包: 下載鏈接代碼里涉及到的 Topic, Producer ID, Consumer ID,需要到 MQ 控制臺(tái)上創(chuàng)建。 MessageTag 可以完全由應(yīng)用自定義,具體創(chuàng)建過(guò)程可參考 申請(qǐng)MQ資源。使用MQ服務(wù)的應(yīng)用程序需要部署在EC
2、S上。發(fā)送普通消息(三種方式)簡(jiǎn)介(Oneway)發(fā)送。本文介紹了每種實(shí)現(xiàn) 的原理、使用場(chǎng)景以及三種實(shí)現(xiàn)的異同,同時(shí)提供了代碼示例以供參考??煽客桨l(fā)送原理:同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包的通訊 方式。應(yīng)用場(chǎng)景:此種方式應(yīng)用場(chǎng)景非常廣泛,例如重要通知郵件、報(bào)名短信通知、營(yíng)銷短信系統(tǒng)等。 可靠異步發(fā)送原理:異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式。MQ 的異步發(fā)送,需要用戶實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),在執(zhí)行消息的異步發(fā)送時(shí),應(yīng)用不需要等待服務(wù)器響應(yīng)即可直接返回,通過(guò)回調(diào)接口接收務(wù)器響應(yīng),并對(duì)
3、服務(wù)器的響應(yīng)結(jié)果進(jìn)行處 理。應(yīng)用場(chǎng)景:異步發(fā)送一般用于鏈路耗時(shí)較長(zhǎng),對(duì) RT 響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場(chǎng)景,例如用戶視頻上傳后通知啟動(dòng)轉(zhuǎn)碼服務(wù),轉(zhuǎn)碼完成后通知推送轉(zhuǎn)碼結(jié)果等。單向(Oneway)發(fā)送原理:?jiǎn)蜗颍∣neway)發(fā)送特點(diǎn)為只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器回應(yīng)且沒(méi)有回調(diào)函數(shù)觸發(fā),即只 發(fā)送請(qǐng)求不等待應(yīng)答。此方式發(fā)送消息的過(guò)程耗時(shí)非常短,一般在微秒級(jí)別。發(fā)送TPS發(fā)送結(jié)果反饋可靠性同步發(fā)送發(fā)送TPS發(fā)送結(jié)果反饋可靠性同步發(fā)送快有不丟失異步發(fā)送快有不丟失單向發(fā)送最快無(wú)可能丟失示例代碼同步發(fā)送public class ProducerTest public static void main(St
4、ring args) Properties properties = new Properties();properties.put(PropertyKeyConst.ProducerId, XXX);/ 您 在 控 制 臺(tái) 創(chuàng) 建 的 Producer ID properties.put(PropertyKeyConst.AccessKey,XXX);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey, XXX);/ SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建proper
5、ties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, 3000);/設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒Producer producer = ONSFactory.createProducer(properties);/ 在發(fā)送消息前,必須調(diào)用start方法來(lái)啟動(dòng)Producer,只需調(diào)用一次即可。producer.start();/循環(huán)發(fā)送消息for (int i = 0; i 100; i+) Message msg = new Message( / Message Topic TopicTestMQ,/ Message Tag 可理解為G
6、mail中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便Consumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾TagA,/ Message Body 可以是任何二進(jìn)制形式的數(shù)據(jù), MQ不做任何干預(yù),/ 需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式Hello MQ.getBytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一。/ 以方便您在無(wú)法正常收到消息情況下,可通過(guò)阿里云服務(wù)器管理控制臺(tái)查詢消息并補(bǔ)發(fā)。/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_ + i);msg.setKey(ORDERID_ + i);/ 同步發(fā)送消息,只要不拋異常就是成功Sen
7、dResult sendResult = producer.send(msg); System.out.println(sendResult);/ 在應(yīng)用退出前,銷毀Producer對(duì)象/ 注意:如果不銷毀也沒(méi)有問(wèn)題producer.shutdown();public static void main(String args) Properties properties = new Properties();properties.put(PropertyKeyConst.AccessKey, DEMO_AK);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)public st
8、atic void main(String args) Properties properties = new Properties();properties.put(PropertyKeyConst.AccessKey, DEMO_AK);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey, DEMO_SK);/ SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.ProducerId, DEMO_PID);/ 您 在
9、 控 制 臺(tái) 創(chuàng) 建 的 Producer ID properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, 3000);/設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒Producer producer = ONSFactory.createProducer(properties);/ 在發(fā)送消息前,必須調(diào)用start方法來(lái)啟動(dòng)Producer,只需調(diào)用一次即可。producer.start();Message msg = new Message(/ Message Topic TopicTestMQ,/ Message Tag,可理解為Gmail
10、中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便Consumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾TagA,/ Message Body,任何二進(jìn)制形式的數(shù)據(jù),MQ不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式Hello MQ.getBytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一。以方便您在無(wú)法正常收到消息情況下,可通過(guò)MQ控制臺(tái)查詢消息并補(bǔ)發(fā)。/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_100);/ 異步發(fā)送消息, 發(fā)送結(jié)果通過(guò)callback返回給客戶端。producer.sendAsync(msg, new SendCal
11、lback() Overridepublic void onSuccess(final SendResult sendResult) / 消費(fèi)發(fā)送成功System.out.println(sendmessagesuccess.topic=+sendResult.getTopic()+,msgId=+ sendResult.getMessageId();Overridepublic void onException(OnExceptionContext context) / 消息發(fā)送失敗System.out.println(send message failed. topic= + contex
12、t.getTopic() + , msgId= + context.getMessageId(););/ 在callback返回之前即可取得msgId。System.out.println(send message async. topic= + msg.getTopic() + , msgId= + msg.getMsgID();/ 在應(yīng)用退出前,銷毀Producer對(duì)象。注意:如果不銷毀也沒(méi)有問(wèn)題producer.shutdown();public static void main(String args) Properties properties = new Properties();
13、properties.put(PropertyKeyConst.AccessKey, DEMO_AK);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)public static void main(String args) Properties properties = new Properties();properties.put(PropertyKeyConst.AccessKey, DEMO_AK);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey, DEMO_SK)
14、;/ SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.ProducerId, DEMO_PID);/ 您 在 控 制 臺(tái) 創(chuàng) 建 的 Producer ID properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, 3000);/設(shè)置發(fā)送超時(shí)時(shí)間,單位毫秒Producer producer = ONSFactory.createProducer(properties);/ 在發(fā)送消息前,必須調(diào)用start方法來(lái)啟動(dòng)Producer,只需調(diào)用一次即可。
15、producer.start();/循環(huán)發(fā)送消息for (int i = 0; i 100; i+) Message msg = new Message(/ Message Topic TopicTestMQ,/ Message Tag,/ 可理解為Gmail中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便Consumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾TagA,/ Message Body/ 任何二進(jìn)制形式的數(shù)據(jù),MQ不做任何干預(yù),需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式Hello MQ.getBytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一。/ 以方便您在無(wú)法正常
16、收到消息情況下,可通過(guò)阿里云服務(wù)器管理控制臺(tái)查詢消息并補(bǔ)發(fā)。/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_ + i);/ oneway發(fā)送消息,只要不拋異常就是成功producer.sendOneway(msg);/ 在應(yīng)用退出前,銷毀Producer對(duì)象/ 注意:如果不銷毀也沒(méi)有問(wèn)題 producer.shutdown();發(fā)送分布式事務(wù)消息目前支持的域包括公網(wǎng)測(cè)試、華東1、華北2、華東2、華南1。MQ事務(wù)消息交互流程如下:發(fā)送事務(wù)消息包含以下兩個(gè)步驟:發(fā)送半消息及執(zhí)行本地事務(wù)package com.alibaba.webx.TryHsf.app1;import
17、 com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter; import com.aliyun.openservices.ons.api.transaction.TransactionProducer; imp
18、ort com.aliyun.openservices.ons.api.transaction.TransactionStatus;import java.util.Properties;import java.util.concurrent.TimeUnit;public class TransactionProducerClient private final static Logger log = ClientLogger.getLog(); / 用戶需要設(shè)置自己的log, 記錄日志便于排查問(wèn)題public static void main(String args) throws Int
19、erruptedException final BusinessService businessServicenew BusinessService()/Service Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId/Producer IDproperties.put(PropertyKeyConst.AccessKey, ); / 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey/ONSFact
20、ory.createTransactionProducer(properties,new LocalTransactionCheckerImpl(); producer.start();Message msg = new Message(Topic, TagA, Hello MQ transaction=.getBytes();/ 輸入您在控制臺(tái)創(chuàng)建的TopicSendResult sendResult = producer.send(msg, new LocalTransactionExecuter() Overridepublic TransactionStatus execute(Mes
21、sage msg, Object arg) / 消息ID(有可能消息體一樣,但消息ID不一樣, 當(dāng)前消息ID在控制臺(tái)無(wú)法查詢) String msgId = msg.getMsgID();/crc32,MD5 long crc32IdHashUtil.crc32Code(msg.getBody();/ 消息ID和crc32id主要是用來(lái)防止消息重復(fù)/ 如果業(yè)務(wù)本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來(lái)做冪等/ 如果要求消息絕對(duì)不重復(fù), 推薦做法是對(duì)消息體body使用crc32或md5來(lái)防止重復(fù)消息Object businessServiceArgs = new Obj
22、ect();TransactionStatus transactionStatus = TransactionStatus.Unknow;try try boolean isCommit = businessService.execbusinessService(businessServiceArgs); if (isCommit)/ 本地事務(wù)成功、提交消息transactionStatus = TransactionStatus.CommitTransaction; else / 本地事務(wù)失敗、回滾消息transactionStatus = TransactionStatus.Rollbac
23、kTransaction; catch (Exception e) log.error(Message Id:, msgId, e);System.out.println(msg.getMsgID();log.warn(Message Id:transactionStatus:, msgId, transactionS(); return transactionStatus;, null);/ demo example 防止進(jìn)程退出(實(shí)際使用不需要這樣) TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);提交事務(wù)消息狀態(tài)當(dāng)本地事務(wù)執(zhí)行完成(執(zhí)行成功或
24、執(zhí)行失?。?,需要通知服務(wù)器當(dāng)前消息的事務(wù)狀態(tài)。通知方式有以 下兩種:執(zhí)行本地事務(wù)完成后提交執(zhí)行本地事務(wù)一直沒(méi)提交狀態(tài),等待服務(wù)器回查消息的事務(wù)狀態(tài)事務(wù)狀態(tài)有以下三種:TransactionStatus.CommitTransactionTransactionStatus.RollbackTransactionimport ducer.LocalTransactionState;publicclassimport ducer.LocalTransactionState;publicclassLocalTransactionCheckerImplimplementsLocalTransaction
25、Checker private final static Logger log =ClientLogger.getLog();final BusinessService businessService = new BusinessService();Overridepublic TransactionStatus check(Message msg) /消息ID(有可能消息體一樣,但消息ID不一樣, 當(dāng)前消息屬于Half 消息,所以消息ID在控制臺(tái)無(wú)法查詢) String msgId = msg.getMsgID();/消息體內(nèi)容進(jìn)行crc32, 也可以使用其它的方法如MD5 long crc
26、32Id = HashUtil.crc32Code(msg.getBody();/消息ID、消息本 crc32Id主要是用來(lái)防止消息重復(fù)/如果業(yè)務(wù)本身是冪等的, 可以忽略, 否則需要利用msgId或crc32Id來(lái)做冪等/如果要求消息絕對(duì)不重復(fù), 推薦做法是對(duì)消息體使用crc32或md5來(lái)防止重復(fù)消息./業(yè)務(wù)自己的參數(shù)對(duì)象, 這里只是一個(gè)示例, 實(shí)際需要用戶根據(jù)情況來(lái)處理Object businessServiceArgs = new Object();Object businessServiceArgs = new Object();TransactionStatus transaction
27、Status = TransactionStatus.Unknow; try boolean isCommit = businessService.checkbusinessService(businessServiceArgs); if (isCommit) /本地事務(wù)已成功、提交消息transactionStatus = TransactionStatus.CommitTransaction; else /本地事務(wù)已失敗、回滾消息transactionStatus = TransactionStatus.RollbackTransaction; catch (Exception e) lo
28、g.error(Message Id:, msgId, e);log.warn(Message Id:transactionStatus:, msgId, transactionS(); return transactionStatus;import java.util.zip.CRC32; public class HashUtil import java.util.zip.CRC32; public class HashUtil public static long crc32Code(byte bytes) CRC32 crc32 = new CRC32();crc32.update(b
29、ytes); return crc32.getValue();事務(wù)回查機(jī)制說(shuō)明發(fā)送事務(wù)消息為什么必須要實(shí)現(xiàn)回查Check機(jī)制?當(dāng)步驟(1)中Half消息發(fā)送完成,但本地事務(wù)返回狀態(tài)為TransactionStatus.Unknow時(shí),或者應(yīng) 用退出導(dǎo)致本地事務(wù)未提交任何狀態(tài)時(shí),從MQ Broker的角度看,這條Half狀態(tài)的消息的狀態(tài)是未知的,因此MQBroker會(huì)定期要求發(fā)送方能Check該Half狀態(tài)消息,并上報(bào)其最終狀態(tài)。Check被回調(diào)時(shí),業(yè)務(wù)邏輯都需要做些什么?MQ事務(wù)消息的check方法里面,應(yīng)該寫一些檢查事務(wù)一致性的邏輯。MQ發(fā)送事務(wù)消息時(shí)需要實(shí)現(xiàn)LocalTransaction
30、Checker接口,用來(lái)處理MQ Broker主動(dòng)發(fā)起的本地事務(wù)狀態(tài)回查請(qǐng)求;因此在事務(wù)消息的Check方法中,需要完成兩件事情:檢查該Half消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)(commitedorrollback)向MQBroker提交該Half消息本地事務(wù)的狀態(tài)發(fā)送延時(shí)消息目前支持的域包括公網(wǎng)測(cè)試、華東1、華北2、華東2、華南1。MQ 客戶端請(qǐng)使用最新版本1.2.2。延時(shí)消息用于指定消息發(fā)送到MQ服務(wù)器端后,延時(shí)一段時(shí)間才被投遞到客戶端進(jìn)行消費(fèi)(例如3秒后才被消費(fèi)),適用于解決一些消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求的場(chǎng)景,或者通過(guò)消息觸發(fā)延遲任務(wù)的場(chǎng)景,類似于延遲 隊(duì)列。代碼示例public class
31、 ProducerDelayTest public static void main(String args) Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerIdXXX);/Producer IDproperties.put(PropertyKeyConst.AccessKey, XXX);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey, XXX);/ SecretKey 阿
32、里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建Producer producer = ONSFactory.createProducer(properties);/ 在發(fā)送消息前,必須調(diào)用 start 方法來(lái)啟動(dòng) Producer,只需調(diào)用一次即可。producer.start();Message msg = new Message( / Message Topic Topic,/ Message Tag, 可理解為Gmail中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便Consumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾tag,/ Message BodyMQProducerConsumerHello MQ.getB
33、ytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一。/ 以方便您在無(wú)法正常收到消息情況下,可通過(guò) MQ 控制臺(tái)查詢消息并補(bǔ)發(fā)。/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_100);/ 延時(shí)時(shí)間單位為毫秒(ms),指定一個(gè)時(shí)刻,在這個(gè)時(shí)刻之后才能被消費(fèi),這個(gè)例子表示 3秒 后才能被消費(fèi)long delayTime = 3000;msg.setStartDeliverTime(System.currentTimeMillis() + delayTime);/ 發(fā)送消息,只要不拋異常就是成功SendResult sendResult = producer.
34、send(msg); System.out.println(Message Id: + sendResult.getMessageId();/ 在應(yīng)用退出前,銷毀Producer對(duì)象/ 注意:如果不銷毀也沒(méi)有問(wèn)題 producer.shutdown();發(fā)送定時(shí)消息目前支持的域包括公網(wǎng)測(cè)試、華東1、華北2、華東2、華南1。定時(shí)消息可以做到在指定時(shí)間戳之后才可被消費(fèi)者消費(fèi),用于解決一些消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求的場(chǎng)景,或者通過(guò)消息觸發(fā)定時(shí)任務(wù)的場(chǎng)景。public class ProducerDelayTest public static void main(String args) Prope
35、rties properties = new Properties();public class ProducerDelayTest public static void main(String args) Properties properties = new Properties();properties.put(PropertyKeyConst.ProducerId, XXX);/ 您 在 MQ 控 制 臺(tái) 創(chuàng) 建 的 Producer ID properties.put(PropertyKeyConst.AccessKey, XXX);/ 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建p
36、roperties.put(PropertyKeyConst.SecretKey, XXX);/ 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建Producer producer = ONSFactory.createProducer(properties);/ 在發(fā)送消息前,必須調(diào)用 start 方法來(lái)啟動(dòng) Producer,只需調(diào)用一次即可。producer.start();Message msg = new Message( / Message Topic Topic,/ Message TagGmailConsumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾tag,/ Message BodyMQP
37、roducerConsumerHello MQ.getBytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一/ 以方便您在無(wú)法正常收到消息情況下,可通過(guò) MQ 控制臺(tái)查詢消息并補(bǔ)發(fā)。/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_100);/* 定時(shí)消息投遞,設(shè)置投遞的具體時(shí)間戳,單位毫秒例如2016-03-07 16:21:00投遞*/longtimeStamp=newSimpleDateFormat(yyyy-MM-ddHH:mm:ss).parse(2016-03-0716:21:00).getTime(); msg.setStartDeliverT
38、ime(timeStamp);/ 發(fā)送消息,只要不拋異常就是成功SendResult sendResult = producer.send(msg); System.out.println(Message Id: + sendResult.getMessageId();/ 在應(yīng)用退出前,銷毀 Producer 對(duì)象/ 注意:如果不銷毀也沒(méi)有問(wèn)題 producer.shutdown();集群方式訂閱消息public class ConsumerTest public static void main(String args) Properties properties = new Propert
39、ies();public class ConsumerTest public static void main(String args) Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, XXX);/ 您 在 控 制 臺(tái) 創(chuàng) 建 的 Producer ID properties.put(PropertyKeyConst.AccessKey, XXX);/ AccessKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyCon
40、st.SecretKey, XXX);/ SecretKey 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(TopicTestMQ, *, new MessageListener() publicActionconsume(Messagemessage,ConsumeContextcontext)System.out.println(Receive: + message); return Action.CommitMessage;System.
41、out.println(Receive: + message); return Action.CommitMessage;);consumer.start(); System.out.println(Consumer Started);廣播方式訂閱消息public class ConsumerTest public static void main(String args) Properties properties = new Properties();public class ConsumerTest public static void main(String args) Propert
42、ies properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, XXX);/ 您 在 控 制 臺(tái) 創(chuàng) 建 的 Consumer ID properties.put(PropertyKeyConst.AccessKey, XXX);/ 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.SecretKey, XXX);/ 阿里云身份驗(yàn)證,在阿里云服務(wù)器管理控制臺(tái)創(chuàng)建properties.put(PropertyKeyConst.MessageMode
43、l,PropertyValueConst.Broadcasting);Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(TopicTestMQ, *, new MessageListener() publicActionconsume(Messagemessage,ConsumeContextcontext) System.out.println(Receive: +message);return Action.CommitMessage;);consumer.start(); Syste
44、m.out.println(Consumer Started);Spring 集成SpringMQSpringSpringSpring生產(chǎn)者與 Spring 集成beans PID_DEMO XXXXXXpackage demo;import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; package demo;import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservi
45、ces.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.exception.ONSClientException; import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceWithSpring
46、public static void main(String args) /* 生產(chǎn)者Bean配置在producer.xml中,可通過(guò)ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.*/ApplicationContext context = new ClassPathXmlApplicationContext(producer.xml);Producer producer = (Producer) context.getBean(producer);/循環(huán)發(fā)送消息for (int i = 0; i 100; i+) Message msg =
47、 new Message( / Message Topic TopicTestMQ,/ Message Tag 可理解為Gmail中的標(biāo)簽,對(duì)消息進(jìn)行再歸類,方便Consumer指定過(guò)濾條件在MQ服務(wù)器過(guò)濾TagA,/ Message Body 可以是任何二進(jìn)制形式的數(shù)據(jù), MQ不做任何干預(yù)/ 需要Producer與Consumer協(xié)商好一致的序列化和反序列化方式Hello MQ.getBytes();/ 設(shè)置代表消息的業(yè)務(wù)關(guān)鍵屬性,請(qǐng)盡可能全局唯一/ 以方便您在無(wú)法正常收到消息情況下,可通過(guò)MQ 控制臺(tái)查詢消息并補(bǔ)發(fā)/ 注意:不設(shè)置也不會(huì)影響消息正常收發(fā)msg.setKey(ORDERID_
48、100);/ 發(fā)送消息,只要不拋異常就是成功try SendResult sendResult = producer.send(msg); assert sendResult != null;System.out.println(send success: + sendResult.getMessageId();catch (ONSClientException e) System.out.println(發(fā)送失敗);事務(wù)消息生產(chǎn)者與 Spring 集成package demo;import com.aliyun.openservices.ons.api.Message;import com.
49、aliyun.openservices.ons.api.transaction.LocalTransactionChecker; import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public class DemoLocalTransactionChecker implements LocalTransactionCheckerpackage demo;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.
50、ons.api.transaction.LocalTransactionChecker; import com.aliyun.openservices.ons.api.transaction.TransactionStatus;public class DemoLocalTransactionChecker implements LocalTransactionCheckerreturn TransactionStatus.CommitTransaction; /根據(jù)本地事務(wù)狀態(tài)檢查結(jié)果返回不同的TransactionStatusbeans PID_DEMO AKDEMOSKDEMO3.通過(guò)已
51、經(jīng)與 Spring 集成好的生產(chǎn)者生產(chǎn)事務(wù)消息。package demo;package demo;import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter; import com.aliyun.openservi
52、ces.ons.api.transaction.TransactionProducer; import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ProduceTransMsgWithSpring public static void
53、main(String args) /*事務(wù)消息生產(chǎn)者Bean配置在transactionProducer.xml中,可通過(guò)ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.請(qǐng)結(jié)合例子發(fā)送事務(wù)消息*/ApplicationContext context = new ClassPathXmlApplicationContext(transactionProducer.xml);TransactionProducer transactionProducer = (TransactionProducer) context.getBean(transac
54、tionProducer); Message msg = new Message(XXX, TagA, Hello MQ transaction=.getBytes();SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() Overridepublic TransactionStatus execute(Message msg, Object arg) System.out.println(執(zhí)行本地事務(wù));return TransactionStatus.CommitTransa
55、ction; /根據(jù)本地事務(wù)執(zhí)行結(jié)果來(lái)返回不同的TransactionStatus, null);消費(fèi)者與 Spring 集成package demo;import com.aliyun.openservices.ons.api.Action;package demo;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message;import com.aliyun
56、.openservices.ons.api.MessageListener;public class DemoMessageListener implements MessageListener publicActionconsume(Messagemessage,ConsumeContextcontext) System.out.println(Receive: +message.getMsgID();try /do something.return Action.CommitMessage;catch (Exception e) /消費(fèi)失敗return Action.ReconsumeLa
57、ter;beans CID_DEMO AKDEMOSKDEMO!-將消費(fèi)者線程數(shù)固定為50個(gè).50-3.運(yùn)行已經(jīng)與 Spring 集成好的消費(fèi)者,如下所示。package demo;package demo;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumeWithSpring public static void main(String args)
58、/* 消費(fèi)者Bean配置在consumer.xml中,可通過(guò)ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.* 消費(fèi)者Bean配置在consumer.xml中,可通過(guò)ApplicationContext獲取或者直接注入到其他類(比如具體的Controller)中.*/ApplicationContextcontext=newClassPathXmlApplicationContext(consumer.xml); System.out.println(ConsumerStarted);C/C+ SDK環(huán)境準(zhǔn)備運(yùn)行本節(jié)描述的 CPP 代碼請(qǐng)準(zhǔn)備好
59、以下環(huán)境。通過(guò)下面方式引入依賴。CPPWindowsLinuxSDK,SDKcppWindows cpp 下載鏈接TopicProducer IDConsumer ID,MQMessage Tag申請(qǐng)MQ資源。使用 MQ 服務(wù)的應(yīng)用程序需要部署在阿里云 ECS 上。MQ 發(fā)送普通消息#include ONSFactory.h#include ONSClientException.h#include ONSFactory.h#include ONSClientException.husing namespace ons; int main()/創(chuàng)建producer和發(fā)送消息所必需的信息;ONSF
60、actoryProperty factoryInfo;ONSFactoryProperty factoryInfo;factoryInfo.setFactoryProperty(ONSFactoryProperty:ProducerId, XXX);/ 您 在 控 制 臺(tái) 創(chuàng) 建 的 Producer ID factoryInfo.setFactoryProperty(ONSFactoryProperty:PublishTopics,XXX );/ 消息內(nèi)容factoryInfo.setFactoryProperty(ONSFactoryProperty:MsgContent, XXX);/消
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 居民健康檔案管理培訓(xùn)
- 數(shù)控車削加工技術(shù) 課件 項(xiàng)目四 數(shù)控車削仿真加工
- 四川省成都市西藏中學(xué)2024-2025高一(1-5班)10月月考?xì)v史試卷 - 副本
- 黑龍江省綏化市海倫市第三中學(xué)2023-2024學(xué)年九年級(jí)上學(xué)期期中考試化學(xué)試卷(含解析)
- T-ZFDSA 01-2024 當(dāng)歸生姜羊肉湯制作標(biāo)準(zhǔn)
- 江蘇省泰州市姜堰區(qū)2024-2025學(xué)年七年級(jí)上學(xué)期11月期中考試數(shù)學(xué)試題(無(wú)答案)
- 算法工程師面試真題單選題100道及答案解析
- 人教版PEP(2024)三年級(jí)上冊(cè)《Unit 6 Useful numbers》Part A第2課時(shí)-教學(xué)課件
- 日常生活活動(dòng)能力訓(xùn)練版
- 圪柳溝安全生產(chǎn)責(zé)任制
- 2024-2025學(xué)年七年級(jí)上學(xué)期數(shù)學(xué)期中模擬試卷(蘇科版2024)(含答案解析)
- 年處理10000輛報(bào)廢新能源汽車拆解再生利用項(xiàng)目可行性研究報(bào)告-模板
- 供應(yīng)商送貨要求規(guī)范
- 教師績(jī)效考核綜合評(píng)價(jià)表.doc
- 鐵路工程預(yù)算定額工程量計(jì)算規(guī)則使用說(shuō)明
- 投標(biāo)書標(biāo)準(zhǔn)格式
- 殘疾人的心理輔導(dǎo)方案計(jì)劃
- 副校長(zhǎng)年度考核評(píng)語(yǔ)
- 民航飛機(jī)維修措施與成本分析
- 結(jié)構(gòu)件防腐蝕生產(chǎn)質(zhì)量控制要求DKBA04000050B匯編
- 新華書店施工組織設(shè)計(jì)(完整版)
評(píng)論
0/150
提交評(píng)論