activeMQ,JMS學習資料.doc_第1頁
activeMQ,JMS學習資料.doc_第2頁
activeMQ,JMS學習資料.doc_第3頁
activeMQ,JMS學習資料.doc_第4頁
activeMQ,JMS學習資料.doc_第5頁
已閱讀5頁,還剩22頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1 JMSJMS源于企業(yè)應用對于消息中間件的需求,使應用程序可以通過消息進行異步處理而互不影響。Sun公司和它的合作伙伴設計的JMS API定義了一組公共的應用程序接口和相應語法,使得Java程序能夠和其他消息組件進行通信。1.1 JMS的基本構件1.1.1 連接工廠連接工廠是客戶用來創(chuàng)建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。1.1.2 連接JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。1.1.3 會話JMS Session是生產和消費消息的一個單線程上下文。會話用于創(chuàng)建消息生產者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發(fā)送和接收被組合到了一個原子操作中。1.1.4 目的地目的地是客戶用來指定它生產的消息的目標和它消費的消息的來源的對象。JMS1.0.2規(guī)范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發(fā)布/訂閱消息傳遞域。點對點消息傳遞域的特點如下:l 每個消息只能有一個消費者。l 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發(fā)送消息的時候是否處于運行狀態(tài),它都可以提取消息。發(fā)布/訂閱消息傳遞域的特點如下:l 每個消息可以有多個消費者。l 生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之后發(fā)布的消息。JMS規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀態(tài)時發(fā)送的消息。在點對點消息傳遞域中,目的地被成為隊列(queue);在發(fā)布/訂閱消息傳遞域中,目的地被成為主題(topic)。1.1.5 消息生產者消息生產者是由會話創(chuàng)建的一個對象,用于把消息發(fā)送到一個目的地。1.1.6 消息消費者消息消費者是由會話創(chuàng)建的一個對象,它用于接收發(fā)送到目的地的消息。消息的消費可以采用以下兩種方法之一:l 同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。l 異步消費??蛻艨梢詾橄M者注冊一個消息監(jiān)聽器,以定義在消息到達時所采取的動作。1.1.7 消息JMS消息由以下三部分組成:l 消息頭。每個消息頭字段都有相應的getter和setter方法。l 消息屬性。如果需要除消息頭字段以外的值,那么可以使用消息屬性。l 消息體。JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。1.2 JMS的可靠性機制1.2.1 消息確認JMS消息只有在被確認之后,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。在事務性會話中,當一個事務被提交的時候,確認自動發(fā)生。在非事務性會話中,消息何時被確認取決于創(chuàng)建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:l Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。l Session.CLIENT_ACKNOWLEDGE。客戶通過消息的acknowledge方法確認消息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然后確認第5個消息,那么所有10個消息都被確認。l Session.DUPS_ACKNOWLEDGE。該選擇只是會話遲鈍第確認消息的提交。如果JMS provider失敗,那么可能會導致一些重復的消息。如果是重復的消息,那么JMS provider必須把消息頭的JMSRedelivered字段設置為true。1.2.2 持久性JMS 支持以下兩種消息提交模式:l PERSISTENT。指JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失。l NON_PERSISTENT。不要求JMS provider持久保存消息。1.2.3 優(yōu)先級可以使用消息優(yōu)先級來指示JMS provider首先提交緊急的消息。優(yōu)先級分10個級別,從0(最低)到9(最高)。如果不指定優(yōu)先級,默認級別是4。需要注意的是,JMS provider并不一定保證按照優(yōu)先級的順序提交消息。1.2.4 消息過期可以設置消息在一定時間后過期,默認是永不過期。1.2.5 本地事務在一個JMS客戶端,可以使用本地事務來組合消息的發(fā)送和接收。JMS Session接口提供了commit和rollback方法。事務提交意味著生產的所有消息被發(fā)送,消費的所有消息被確認;事務回滾意味著生產的所有消息被銷毀,消費的所有消息被恢復并重新提交,除非它們已經過期。事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。需要注意的是,如果使用請求/回復機制,即發(fā)送一個消息,同時希望在同一個事務中等待接收該消息的回復,那么程序將被掛起,因為知道事務提交,發(fā)送操作才會真正執(zhí)行。需要注意的還有一個,消息的生產和消費不能包含在同一個事務中。2 ActiveMQActiveMQ是apache旗下開源消息中間件,是目前最流行的開源的消息中間件。ActiveMQ功能特點:l 支持跨語言的客戶端,如:Java,C和C + +,C,Ruby,Perl,Python和PHP。l 在JMS客戶端和消息代理都全面支持企業(yè)集成模式。l 支持許多高級功能,如:信息組,虛擬目的地,通配符和復合目的地。l 完全支持JMS 1.1和J2EE1.4 。l 支持Spring,以便ActiveMQ可以很容易地嵌入到Spring應用程序和使用Spring的XML配置機制 。l 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試。 l 支持可插拔傳輸協議,如:in-VM,TCP, SSL, NIO, UDP, multicast, JGroups and JXTA 傳輸。l 支持通過JDBC和journal提供高速的消息持久化。l 設計基于高性能集群,客戶端服務器,對等通信。l Ajax支持網絡流媒體,支持使用純DHTML的Web瀏覽器,允許Web瀏覽器成為消息傳遞結構的一部分。l 支持CXF和Axis,以便ActiveMQ可以很容易地進入這些Web服務并提供可靠的消息傳遞。l 可作為一個在內存中的JMS提供者,是JMS的單元測試的理想選擇。2.1 安裝ActiveMQ 可以從官方網站(/)下載最新版的ActiveMQ。(最新版的為:5.4.2)2.2 啟動ActiveMQ在下載最新的ActiveMQ將其解壓到相應的目錄就可以了。需要啟動ActiveMQ只要找到$activemq.home/bin目錄,雙擊運行activemq.bat就可以了(windows版本)。啟動后的界面如上圖。(ActiveMQ5.3.0版本啟動后的截圖,不同版本會有所不同)Listening for connections at: tcp:/74:61616(這里的IP顯示的是機器名稱)tcp:/74:61616表示監(jiān)聽的端口地址,也就是編寫程序時,獲取連接進用到的URL。ActiveMQ Console at 74:8161/admin 74:8161/admin這個是新版本的activeMQ提供的管理工具訪問地址,可以查看隊列詳情,生產者,消費者等信息。2.3 配置ActiveMQ2.3.1 基本配置說明ActiveMQ默認使用的是XML格式配置,配置文件在$activemq.home/conf目錄下,文件名為activemq.xml file:$activemq.base/conf/perties !- The element is used to configure the ActiveMQ broker. - producerFlowControl=true memoryLimit=1mb producerFlowControl=true memoryLimit=1mb !- Use VM cursor for better latency For more information, see: /message-cursors.html - !- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: /producer-flow-control.html - Kahadb配置說明配置示例:KahaDB的各個可配置屬性:屬性默認值描述directoryactivemq-data保存message store數據文件的目錄indexWriteBatchSize1000批量更新索引的閥值,當要更新的索引到達這個索引時,批量更新到metadata store中indexCacheSize10000指定metadata cache的大小enableIndexWriteAsyncfalse寫入索引文件到metadata store中的方式是否采用異步寫入journalMaxFileLength32mb消息持久數據文件的大小enableJournalDiskSyncstrue如果為true,保證使用同步寫入的方式持久化消息到journal文件中cleanupInterval30000清除(清除或歸檔)不再使用的journal 文件的時間周期(毫秒)。checkpointInterval5000寫入索引信息到metadata store中的時間周期(毫秒)ignoreMissingJournalfilesfalse是否忽略丟失的journal文件。如果為false,當丟失了journal文件時,broker啟動時會拋異常并關閉checkForCorruptJournalFilesfalse如果為true,broker在啟動的時候會檢測journal文件是否損壞,若損壞便嘗試恢復它。checksumJournalFilesfalse如果為true。KahaDB為journal文件生產一個checksum,以便能夠檢測journal文件是否損壞。archiveDataLogsfalse如果為true,當達到cleanupInterval周期時,會歸檔journal文件而不是刪除directoryArchivenull指定歸檔journal文件存放的路徑databaseLockedWaitDelay10000在使用主從數據庫備份時,等待獲取DB上的lock的延遲時間。maxAsyncJobs10000等待寫入journal文件的任務隊列的最大數量。應該大于或等于最大并發(fā)producer的數量。配合并行存儲轉發(fā)屬性使用。concurrentStoreAndDispatchTransactionsfalse如果為true,轉發(fā)消息的時候同時提交事務concurrentStoreAndDispatchTopicsfalse如果為true,轉發(fā)Topic消息的時候同時存儲消息的message store中。concurrentStoreAndDispatchQueuestrue如果為true,轉發(fā)Queue消息的時候同時存儲消息到message store中。2.3.2 安全性配置說明(以5.3.0的版本為例,各個版本配置會有所不同,需查閱相應資料)訪問activeMQ權限配置是在$activemq.home/conf目錄下的activemq.xml中配置的,默認是沒有進行配置,任何用戶都是可以訪問activeMQ。如果需要對訪問權限進行控制可以進行相應配置,可以指定到相應隊列的訪問權限。1. 配置activemq.xml配置權限需要在,activemq.xml的元素中加入以下部分。 /queue=對應的是所有隊列/ read=”admins” 表示admins組的所有用戶都可以消費前面queue屬性指定的隊列的消息/ write=admins表示admins組的所有用戶都可以向前面queue屬性指定的隊列發(fā)送消息/ admin=admins表示admins組的所有用戶可以在前面queue屬性指定的隊列不存在的情況創(chuàng)建隊列 read=admins write=admins admin=admins / queue=TEST.表示隊列名稱所有以”TEST.”(包含“.”)開頭的隊列,(這里建議隊列名稱以“.”作分隔符) read=users write=users admin=users / read=users write=users admin=users / write=guests,users admin=guests,users可以同時指定多個組 read=guests write=guests,users admin=guests,users /主題相關配置,與隊列配置類似 read=admins write=admins admin=admins / read=users write=users admin=users / read=guests write=guests,users admin=guests,users / read=guests,users write=guests,users admin=guests,users/ 2. 用戶配置在$activemq.home/conf目錄下增加 login.config、perties、perties三個文件。l login.config內容activemq-domain org.apache.activemq.jaas.PropertiesLoginModule required debug=true perties.user=perties perties.group=perties; ;perties.user=perties指定用戶對應的配置文件perties.group=perties;指定用戶組對應的配置文件l perties內容admins=systemusers=zengjun,llguests=guestusers=zengjun,llusers表示用戶組名,與activemq.xml安全性配置中的read,write,admin屬性值對應。zengjun,ll表示兩個用戶名zengjun和ll,這里表示users組下有兩個用戶zengjun和ll。l perties內容system=managerzengjun=zjll=llzengjun=zjzengjun表示用戶名,與perties配置用戶名對應zj表示等號左邊用戶對應的密碼。注:在配置安全性配置后,在寫代碼創(chuàng)建連接時需要加上對應的用戶名與密碼。如:Connection connection = connectionFactory.createConnection(zengjun,zj);2.4 點對點域2.4.1 生產消息生產消息都步驟如上圖的深色部分所示。首先需要從連接工廠中獲取到連接,然后通過連接來創(chuàng)建會話,再通過會話來創(chuàng)建目的地,再用會話與目的地來創(chuàng)建生產者。需要發(fā)送的消息也是通過會話來創(chuàng)建的。最后通過生產者來發(fā)送消息。示例代碼:/初使化連接工廠ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Connection connection = null; Session session = null; MessageProducer producer = null;Destination destination = null;/創(chuàng)建連接connection = connectionFactory.createConnection();/創(chuàng)建會話 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/創(chuàng)目的地destination = session.createQueue(TEST_QUEUE_ZJ);/創(chuàng)生產者producer = session.createProducer(destination);/設置消息的持久模式producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();/創(chuàng)建消息TextMessage message = session.createTextMessage();/設置消息屬性 double d = Math.random();message.setStringProperty(ID, String.valueOf(d);message.setText(tttt111);message.setText(tttt22222222);/發(fā)送消息producer.send(message);2.4.2 消費消息消費消息如上圖深色部分所示。首先需要從連接工廠創(chuàng)建連接,然后再通過連接創(chuàng)建會話,然后通過會話創(chuàng)建目的地,再通過會話與目的地來創(chuàng)消費者,然后消費者調用接口來消費消息。代碼示例:/初始連接工廠 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Session session = null; MessageConsumer consumer = null; Connection connection = null; Message message = null;Destination destination=null;/創(chuàng)建連接connection = connectionFactory.createConnection();connection.start();/創(chuàng)建會話,指定消息的確認模式,(確認模式請參1.2.1)session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/創(chuàng)建目的地destination = session.createQueue(TEST_QUEUE_ZJ);/創(chuàng)建消息者consumer = session.createConsumer(destination);/消息者調用接收消息接口來接收消息。/ 無時間參數表示一直等待,直到收到消息。/ message = consumer.receive();/ 有時間參數表示指定時間后沒有消息則結束時,如果存在消息就在取完消息后結束message = consumer.receive(5 * 1000);/ 如果沒有收到消息,不會等待,立即往下執(zhí)行,不建議使用/ message = consumer.receiveNoWait();/判斷是否收到消息。if (message != null) /判斷消息的類型if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println(TEXT: + text);/ 確認消息textMessage.acknowledge(); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message;String strId = streamMessage.getStringProperty(ID);System.out.println(streammessage ID: + strId);/確認消息streamMessage.acknowledge(); else System.out.println(沒有收到消息);還可以采用監(jiān)聽的方式來消費消息。采用監(jiān)聽的方式,需要實現MessageListener接口,創(chuàng)建消費者的方式與前面描消費消息的步驟一致,在建創(chuàng)建好消費者后需要設置實現MessageListener接口的監(jiān)聽器,當監(jiān)聽器監(jiān)聽到消費者對應的目的地上有消息時,會自動調用onMessage方法,在onMessage方法中可以得到消息。代碼示例:/實現MessageListener接口public class AmqConsumerOnMessage implements MessageListener public static void main(String args) new AmqConsumerOnMessage().createConsumer(TEST_queuE);/實現onMessage方法 public void onMessage(Message message) try if (message != null) if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(TEXT: + text); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message; String strId = streamMessage.getStringProperty(ID); System.out.println(streammessage ID: + strId); /確認消息 message.acknowledge(); else System.out.println(沒有收到消息); catch (JMSException e) e.printStackTrace(); /創(chuàng)建消費者 public void createConsumer(String queue) MessageConsumer consumer = null; Session session = null; Connection connection = null; try ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination objDestination = null; objDestination = session.createQueue(queue); consumer = session.createConsumer(objDestination);/設置監(jiān)聽器,來監(jiān)聽消息 consumer.setMessageListener(this); catch (JMSException e) e.printStackTrace(); finally ConnectionUtil.closeAll(connection, session, consumer); 2.5 發(fā)布/訂閱域2.5.1 發(fā)布消息發(fā)布消息與點對點域的生產消息類似。只是將隊列換成了主題,將生產者換成發(fā)布者代碼示例:/初始化連接工廠ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616);TopicConnection connection = null;ActiveMQTopicSession session = null;ActiveMQTopicPublisher publisher = null;ActiveMQTopic topic = null; /創(chuàng)建連接connection = connectionFactory.createTopicConnection(zengjun, zj);/創(chuàng)建會話session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);/創(chuàng)建主題topic = (ActiveMQTopic) session.createTopic(TEST.topic.zj);/創(chuàng)建發(fā)布者publisher = (ActiveMQTopicPublisher) session.createPublisher(topic);/設置消息持久方式,如果要實現持久訂閱,持久方法必須是DeliveryMode.PERSISTENTpublisher.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();TextMessage message = session.createTextMessage();mes

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論