全新java高級架構(gòu)師項(xiàng)目實(shí)戰(zhàn)智能運(yùn)營助手activitymq activemq_第1頁
全新java高級架構(gòu)師項(xiàng)目實(shí)戰(zhàn)智能運(yùn)營助手activitymq activemq_第2頁
全新java高級架構(gòu)師項(xiàng)目實(shí)戰(zhàn)智能運(yùn)營助手activitymq activemq_第3頁
全新java高級架構(gòu)師項(xiàng)目實(shí)戰(zhàn)智能運(yùn)營助手activitymq activemq_第4頁
全新java高級架構(gòu)師項(xiàng)目實(shí)戰(zhàn)智能運(yùn)營助手activitymq activemq_第5頁
已閱讀5頁,還剩16頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

每特學(xué)院Java互聯(lián)網(wǎng)架構(gòu)培 老如果由于服務(wù)對象或者網(wǎng)絡(luò)故障導(dǎo)致用戶的請求不可達(dá),客戶會受到異常JMSPoint-to-Point(P2P)Point-to-Point(P2P)Publish/Subscribe(Pub/Sub)---P2P(點(diǎn)對點(diǎn)P2P消息隊(duì)列發(fā)送者接收者P2PPub/Sub發(fā)布與訂閱客戶端將消息發(fā)送到。多個(gè)發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞發(fā)布者和訂閱者之間有時(shí)間上的依賴性。針對某個(gè)(Topic)的訂閱者,它必為了緩和這樣嚴(yán)格的時(shí)間相關(guān)性,JMSPub/SubJMSJMSreceivereceive訂閱者或接收者可以為一個(gè)消息器。當(dāng)消息到達(dá)之后,系統(tǒng)自動調(diào)用onMessage用戶、訂單修改庫存、日志MQErlang編寫的一個(gè)開源的消息隊(duì)列,本身支持很多的協(xié)議:AMQP,XMPP,SMTPSTOMP,也正是如此,使的它變的非常重量級,更適合于企業(yè)級的開發(fā)。同時(shí)實(shí)由(Routing),負(fù)載均衡(Loadbalance)或者數(shù)據(jù)持久化都有很好的支持。是一個(gè)Key-Value的NoSQL數(shù)據(jù)庫,開發(fā)很活躍,雖然它是一個(gè)Key-Value數(shù)據(jù)庫MQ功能,所以完全可以當(dāng)做一個(gè)輕量級的隊(duì)列服務(wù)來使用。對RabbitMQRedis10010萬次記錄一次執(zhí)行時(shí)128Bytes、512Bytes、1K10K四個(gè)不同大小的數(shù)據(jù)。實(shí)驗(yàn)表明:入Redis的性能要高于RabbitMQ10K,Redis則慢的受;出隊(duì)時(shí),無論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能,而RabbitMQRedis。號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景。ZMQRabbitMQ不擅MQ能夠應(yīng)用成功的。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式,你不需要安裝和運(yùn)ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送ZeroMQ僅提供非持久性的隊(duì)列,也就是說如果down機(jī),數(shù)據(jù)將會丟失。其中,的Storm中使用ZeroMQ作為數(shù)據(jù)流的傳輸。是Apache下的一個(gè)子項(xiàng)目。類似于ZeroMQ,它能夠以人和點(diǎn)對點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)ZeroMQ、ActiveMQC++、Java、.Net,、PythonPhp、Ruby等。KafkaApachePublish/Subscribe消息隊(duì)JafkaKafkaKafka的一個(gè)升級版。具有以下特性:O(1)的系統(tǒng)開銷下進(jìn)行消息持久化;高吞吐,在一臺普通的服務(wù)器上10W/s的吞吐速率;完全的分布式系統(tǒng),Broker、Producer、Consumer都原生自動支持分布式,自動實(shí)現(xiàn)復(fù)雜均衡;支持Hadoop數(shù)據(jù)并行加載,對于像Hadoop的Kafka通過Hadoop的并行加載機(jī)制來統(tǒng)一了和離線的消息處理,這一點(diǎn)也是本課題所研究系統(tǒng)所看重的。ApacheKafkaActiveMQ是一個(gè)非常輕量級的消息系統(tǒng),除其他一些隊(duì)列列表HornetQ、ApacheQpid、Sparrow、Starling、Kestrel、Beanstalkd、AmazonSQS就不再一一分析。ActiveMQwindowActiveMQActiveMQJavajavaJDK并配置然后解壓的apache-activemq-5.10- .133406-78-bin.zip壓縮包到一個(gè) 進(jìn)入 ,發(fā)現(xiàn)有win32和win64兩個(gè)文件夾,這2個(gè)文件夾分別對應(yīng) 輸入用戶名,默認(rèn)用戶名為admin、admin,這個(gè)用戶名是在conf/perties中配置的。輸入用戶名后便可看到如下圖的ActiveMQ控制臺界面了。4.1.1NumberOfConsumersNumberOfPendingMessagesMessagesEnqueuedMessagesDequeued出了隊(duì)列的消息可以理解為是消費(fèi)這消費(fèi)掉的數(shù)量queuestopics11。011.12.沒有消費(fèi)者時(shí)PendingMessages和入隊(duì)列數(shù)量一樣有消費(fèi)者消費(fèi)的時(shí)候Pedding會減少出隊(duì)列會增加到最后就是入隊(duì)列和出隊(duì)列的數(shù)量一樣多ActiveMQ(p2p)通訊模式pompublicpublicclassProducterpublicstaticvoidmain(String[]args)throws ceptionConnectionFactory:連接工廠,JMSConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://:61616");JMS客戶端到JMSProviderConnectionconnection=connectionFactory.createConnection();SessionSessionsession=connection.createSession(Boolean.falst,Destination:消息的目的地;消息發(fā)送給誰//Destinationdestination=session.createQueue("my-//MessageProducerproducer=for(inti=1;i<=5;i++)sendMsg(session,producer,}}*@param**@param*publicstaticvoidsendMsg(Sessionsession,MessageProducerproducer,inti)throws ceptionTextMessagemessage=oActiveMQ!"+//}}publicpublicclassJmsReceiverpublicstaticvoidmain(String[]args)throws ceptionConnectionFactory:連接工廠,JMSConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://:61616");JMS客戶端到JMSProviderConnectionconnection=connectionFactory.createConnection();SessionSessionsession=connection.createSession(Boolean.TRUE,Destination:消息的目的地;消息發(fā)送給誰//Destinationdestination=session.createQueue("my-MessageConsumerconsumer=session.createConsumer(destination);while(true){TextMessagemessage=(TextMessage)consumer.receive();if(null!=message){System.out.println("收到消息:}}}}JMS1如果session2Session.AUTO_ACKNOWLEDGE消息自動簽收Session._ACKNOWLEDGE客戶端調(diào)用acknowledge方法手動簽Session.DUPS_OK_ACKNOWLEDGE戶處理消息和消息被確認(rèn)。在事務(wù)性會話中,當(dāng)一個(gè)事務(wù)被提交的時(shí)候,確認(rèn)自動發(fā)生。在非事務(wù)性會話中,消息何時(shí)被確認(rèn)取決于創(chuàng)建會話時(shí)的應(yīng)答模式(acknowledgementmode)。該參數(shù)有以下三個(gè)可選NumberOfConsumersNumberOfPendingMessagesMessagesEnqueued進(jìn)入隊(duì)列的消息進(jìn)入隊(duì)列的總數(shù)量,MessagesDequeuedSessionsession=createConnectioncreateSession(BooleanFALSE,TextMessagetextMessage=(TextMessage)createConsumertextMessageSessionsession=createConnectioncreateSession(BooleanFALSE,SessionSessionsession=createConnectioncreateSession(BooleanFALSE,SessionSessionsession=createConnectioncreateSession(BooleanTRUE,SessionAUTO_ACKNOWLEDGE);sessioncommit();Sessionsession=createConnectioncreateSession(BooleanTRUE,SessionAUTO_ACKNOWLEDGE);sessioncommit();ActiveMQproducersetDeliveryMode(DeliveryModepublicpublicclassTOPSendprivatestaticStringBROKERURL="tcp://:61616";privatestaticStringTOPIC="my-topic";publicstaticvoidmain(String[]args)throws ception}staticpublicvoidstart()throwsJMception{ ActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,BROKERURL);Connectionconnection=啟動JMSSessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);MessageProducerproducer=session.createProducer(null);send(producer,session);}staticpublicvoidsend(MessageProducerproducer,Sessionsession)throwsJM ception{for(inti=1;i<=5;i++){System.out.println("我是消息TextMessagetextMessagesession.createTextMessage("我是消息i);Destinationdestination=session.createTopic(TOPIC);producer.send(destination,textMessage);}}}消費(fèi)者publicpublicclassTopReceiverprivatestaticStringBROKERURL=privateprivatestaticStringTOPIC="my-publicstaticvoidmain(String[]args)throws ception}staticpublicvoidstart()throws ceptionActiveMQConnectionFactoryactiveMQConnectionFactory=newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,BROKERURL);Connectionconnection=啟動JMS//Sessionsession=connection.createSession(false,Topictopic=MessageConsumerconsumer=//consumer.setMessageListener(newMsgListener());while(true){TextMessagetextMessage=(TextMessage)consumer.receive();if(textMessage!=null){System.out.println("接受到消息//}else}}}}SpringBoot生產(chǎn)者maven<relativePath/><!--lookupparentfromrepository--springbootweb支持:mvc,aopapplication.ymlbroker-url:tcp://:61616user:adminpassword:adminqueue:springboot-queueport:publicclassQueueConfig{privateStringqueue;publicQueuelogQueue()returnnew}publicJmsTem tejmsTem te(ActiveMQConnectionFactoryactiveMQConnectionFactory,Queuequeue){ tejmsTem te=newJmsTem tesetDeliveryMode(2);//進(jìn)行持久化配置1表示非持久化,2表示持久化</span> tesetConnectionFactory(activeMQConnectionFactory); te estination(queue tesetSessionAcknowledgeMode(4);//客戶端簽收模式</span>returnjmsTem }//定義一個(gè)消 @Bean(name=publicDefaultJmsListenerContainerFactoryActiveMQConnectionFactoryactiveMQConnectionFactory){DefaultJmsListenerContainerFactoryActiveMQConnectionFactoryactiveMQConnectionFactory){DefaultJmsListenerContainerFactoryfactory=newDefaultJmsListenerContainerFactory();factorysetConnectionFactory(activeMQConnectionFactory);factorysetConcurrency("1-factorysetRecoveryInterval(1000L);factorysetSessionAcknowledgeMode(4);returnfactory;}}publicclassProducer{privateJmsMessagingTemprivateQueuete@Scheduled(fixedDelay=publicvoidsend() te.convertAndSend(queue,測試消息隊(duì)

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論