在職加薪70.消息中間件-96rabbitmqv_第1頁
在職加薪70.消息中間件-96rabbitmqv_第2頁
在職加薪70.消息中間件-96rabbitmqv_第3頁
在職加薪70.消息中間件-96rabbitmqv_第4頁
在職加薪70.消息中間件-96rabbitmqv_第5頁
已閱讀5頁,還剩29頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

RabbitMQ介Q全稱為ssaeQee,即消息隊列,Rit是由rlang語言開發(fā),基于AP(AvcdMssageuee高級消息隊列協(xié)議)協(xié)議實現(xiàn)的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應用非常廣泛。RabitMQ地址:htp: /開發(fā)中消息隊列通常有如下應用場景1、任務異步處理2、應用程序解耦市場上還有哪些消息隊列5、SpringBoot默認已集成AMQP總結:AMP是一套公開的消息隊列協(xié)議,最早在200年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標準格式,為的就是解決M市場上協(xié)議不統(tǒng)一的問題。RabitMQ就是遵循AP標準協(xié)議開發(fā)的M服務。:JMS是什么總結JS是jav提供的一套消息服務AP標準,其目的是為所有的jva應用程序提供統(tǒng)一的消息通信的標準,類似jv的jd,只要遵循j標準的應用程序之間都可以進行消息通信。它和AP有什么不同,j是jv語言專屬的消息服務標準,它是在api層定義標準,并且只能用于jv應用;而AMP是在協(xié)議層定義的標準,是跨語言的??焖偃虢M成部分說明如Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。消息發(fā)布接收流4、Exchange將消息轉發(fā)到指定的Queue(隊列5、消費者接收到消息安要安裝Erlang/OTP,并保持版本匹配,如下圖 本項目使用Erlang/OTP20.3版本和RabbitMQ3.7.3版本1)地址如下win6420.3.exe或去老師提供的軟件包中找 erlang安裝完成需要配置erlang環(huán)境變量:ERLANG_HOME=D:\ProgramFiles\erl9.3在path中添2)安裝

/rabbitmq/rabbitmq-或去老師提供的軟件包中找 啟RabbitMQService-install:安裝服務RabbitMQService-remove刪除服務RabbitMQService-start啟動RabbitMQService-stop啟動如果沒有開始菜單則進入安 下 安裝并運行服rabbitmq-service.batinstall安裝服務rabbitmq-service.batstop停止服務rabbitmq-service.batstart啟動服務管理員運行rabbitmq-plugins.batenable3、啟動成功RabbitMQ注意事o按 /getstarted.html)測 搭建環(huán)java我們先用abbitMQ提供的jvaclient測試,目的是對RabitMQ的交互過程有個清晰的認識。參考: /abbtmq/abbitmqjva-cliet/創(chuàng)建生產(chǎn)者工程和消費者工程,分別加入RabbitMQjavaclient的依賴。test-rabbitmq-consumer:消費者工程<version>4.0.3</version><!‐‐此版本與springboot1.5.9版本匹配生產(chǎn)在生產(chǎn)者工程下的testpublicpublicclassProducer01//隊列名privatestaticfinalStringQUEUE= publicstaticvoidmain(String[]args)throwsIOException,{Connectionconnection=null;Channelchannel=null;{ConnectionFactoryfactory=newConnectionFactory();factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服器//創(chuàng)建與RabbitMQ服務的TCP連connection=channel=connection.createChannel();隊列,如果Rabbit中沒有此隊列將自動創(chuàng)param1:隊列名param2:是否持久param3:隊列是否獨占此連param4:隊列不再使用時是否自動刪除此param5:隊列參channel.queueDeclare(QUEUE,true,false,false,Stringmessage=*消息發(fā)布方param1:Exchange的名稱,如果沒有指定,則使用Defaultparam2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發(fā)到指定的消param3:消息包含的屬param4:消息這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定默認的交換機,routingKeychannel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("SendMessageis:'"+message+"'");}catch(Exception{}{if(channel!={}if(connection!={}}}}消費在消費者工程下的test中創(chuàng)建測試類如下publicpublicclassConsumer01privatestaticfinalStringQUEUE= publicstaticvoidmain(String[]args)throwsIOException,{ConnectionFactoryfactory=newConnectionconnection=factory.newConnection();Channelchannel=connection.createChannel(); 隊channel.queueDeclare(QUEUE,true,false,false,//定義消費方DefaultConsumerconsumer=newDefaultConsumer(channel)消費者接收@paramconsumerTag消費者 ,在channel.basicConsume()去指@paramenvelope消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重(收到消息失敗后是否需要重新發(fā)送@param@param@throwspublicvoidhandleDelivery(StringEnvelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException//Stringexchange=//路由StringroutingKey=//消息longdeliveryTag=//Stringmsg=newSystem.out.println("receivemessage.."+}隊列Stringqueue,booleanautoAck,Consumer12、是否自動回復,設置為true為表示消息接收到自 mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復3channel.basicConsume(QUEUE,true,}}2.2.4總1、發(fā)送端操作流創(chuàng)建創(chuàng)建發(fā)送創(chuàng)建創(chuàng)建接收ack回工作模RabbitMQ有以下幾種工作模式:1、WorkWorkworkqueues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。測試工作模發(fā)布訂閱模式、生產(chǎn)者將消息發(fā)給be,由交換機將消息轉發(fā)到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息代案例用戶通知,當用戶充值成功或轉賬完成系統(tǒng)通知用戶,通知方式有、郵件多種方法Exchange_fanout_inform兩個隊列并且綁定到此交換機,綁定時不需要指定otingy發(fā)送消息時不需要指定outingypackagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;packagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;importimportpublicclassProducer02_publish//隊列名privatestaticfinalStringQUEUE_INFORM_ ="queue_inform_ privatestaticfinalStringQUEUE_INFORM_SMS="queue_inform_sms";privatestaticfinalStringEXCHANGE_FANOUT_INFORM="exchange_fanout_inform";publicstaticvoidmain(String[]args){Connectionconnection=null;Channelchannel=null;try//創(chuàng)建一個與MQ的連ConnectionFactoryfactory=newConnectionFactory();factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服器//創(chuàng) connection=channel=connection.createChannel(); 交換機Stringexchange,BuiltinExchangeType參數(shù)1、交換機名2、交換機類型,fanout、topic、direct、channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, 隊 (Stringqueue,booleandurable,booleanexclusive,booleanautoDelete,Map<String,Object>arguments)參數(shù)明細1、隊列名2、是否持久3、是否獨占此4、隊列不用是5 ,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//交換機和隊列綁定Stringqueue,Stringexchange,String參數(shù)1、隊列名2、交換機名3、路由*/channel.queueBind(QUEUE_INFORM_ //發(fā)送消for(intStringmessage="informto//向交換機發(fā)送消息Stringexchange,StringroutingKey,BasicPropertiesbyte[]byte[]參數(shù)明1、交換機名稱,不指令使用默認交換機名稱Default2、routingKey(路由key),根據(jù)key名稱將消息轉發(fā)到具體的隊列,這里填寫隊列名稱表示息將發(fā)到此隊3、消息屬4、消息內(nèi)channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("SendMessageis:'"+message+"'");}}catch(IOException{;}catch(TimeoutException{try{}catch(IOException{}catch(TimeoutException{}}try{}catch(IOException{}}}}2、郵件發(fā)送消費packagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;packagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;importimport@author@version@create2018‐06‐14publicclass //隊列名privatestaticfinalStringQUEUE_INFORM_ ="inform_queue_ privatestaticfinalStringEXCHANGE_FANOUT_INFORM="inform_exchange_fanout";publicstaticvoidmain(String[]args)throwsIOException,TimeoutException//創(chuàng)建一個與MQ的連ConnectionFactoryfactory=newConnectionFactory();factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務//創(chuàng) Connectionconnection=//創(chuàng)建與交換機的通道,每個通道代表一Channelchannel= 交換機Stringexchange,BuiltinExchangeType參數(shù)1、交換機名2、交換機類型,fanout、topic、direct、channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, 隊 channel.queueDeclare(Stringqueue,booleandurable,booleanexclusive,booleanautoDelete,Map<String,Object>arguments)參數(shù)明細1、隊列名2、是否持久3、是否獨占此4、隊列不用是否自動刪5 ,true,false,false,//交換機和隊列綁定Stringqueue,Stringexchange,String參數(shù)1、隊列名2、交換機名3、路由*/ //定義消費方DefaultConsumerdefaultConsumer=new{publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOExceptionpublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{longdeliveryTag=envelope.getDeliveryTag();Stringexchange=envelope.getExchange();//Stringmessage=newString(body,}隊列Stringqueue,booleanautoAck,Consumer12、是否自動回復,設置為true為表示消息接收到自 mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復 ,true,}}3、發(fā)送消費參考上邊的郵件發(fā)送消費者代碼編寫測思1、publish/subscribe與workqueues有什么區(qū)別。workqueues不用定義交換機,而publish/subscribepulish/subscrib的生產(chǎn)方是面向交換機發(fā)送消息,workqees的生產(chǎn)方是面向隊列發(fā)送消息(pubis/subscibe需要設置隊列和交換機的綁定,workqueues不需要設置,實質上workqueues會將隊列綁定到默認的交換機。相同所以兩者實現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個消費端同一個隊列不會重復消費消息2、實質工作用什么publish/subscribe還是workqueues建議使用pulish/subscribe,發(fā)布訂閱模式比工作隊列模式更強大,并且發(fā)布訂閱模式可以指定自己的交換機。工作模路由模式代exchange_routing_inform兩個隊列并且綁定到此交換機,綁定時需要指定otingy發(fā)送消息時需要指定outingypackagepackageimportcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importimportpublicclassProducer03_routing//隊列名privatestaticfinalString=privatestaticfinalStringQUEUE_INFORM_SMS=privatestaticfinalStringEXCHANGE_ROUTING_INFORM="exchange_routing_inform";publicstaticvoidmain(String[]args){Connectionconnection=null;Channelchannel=null;try//創(chuàng)建一個與MQ的連ConnectionFactoryfactory=newfactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服器//創(chuàng) connection=channel=connection.createChannel(); 交換機Stringexchange,BuiltinExchangeType參數(shù)1、交換機名2、交換機類型,fanout、topic、direct、 隊 channel.queueDeclare(Stringqueue,booleandurable,booleanexclusive,booleanautoDelete,Map<String,Object>arguments)參數(shù)明細1、隊列名2、是否持久3、是否獨占此4、隊列不用是5 ,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//交換機和隊列綁定Stringqueue,Stringexchange,String參數(shù)1、隊列名2、交換機名3、路由 //發(fā)送郵件消for(intStringmessage= informto//向交換機發(fā)送消息Stringexchange,StringroutingKey,BasicPropertiesbyte[]息將發(fā)到此

參數(shù)1、交換機名稱,不指令使用默認交換機名稱Default2、routingKey(路由key),根據(jù)key名稱將消息轉發(fā)到具體的隊列,這里填寫隊列名稱表3、消息屬4、消息內(nèi)channel.basicPublish(EXCHANGE_ROUTING_INFORM, ,System.out.println("SendMessageis:'"+message+byte[]

消for(intStringmessage="smsinformto//向交換機發(fā)送消息Stringexchange,StringroutingKey,BasicPropertieschannel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS,System.out.println("SendMessageis:'"+message+}}catch(IOException{;}catch(TimeoutException{try{}catch(IOException{}catch(TimeoutException{}}try{}catch(IOException{}}}}}2、郵件發(fā)送消費packagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;packagecom.xuecheng.test.rabbitmq;importcom.rabbitmq.client.*;importimportpublicclass //隊列名privatestaticfinalStringQUEUE_INFORM_ ="inform_queue_ privatestaticfinalStringEXCHANGE_ROUTING_INFORM="inform_exchange_routing";publicstaticvoidmain(String[]args)throwsIOException,TimeoutException//創(chuàng)建一個與MQ的連ConnectionFactoryfactory=newConnectionFactory();factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務//創(chuàng) Connectionconnection=//創(chuàng)建與交換機的通道,每個通道代表一Channelchannel= 交換機Stringexchange,BuiltinExchangeType參數(shù)1、交換機名2、交換機類型,fanout、topic、direct、 隊 channel.queueDeclare(Stringqueue,booleandurable,booleanexclusive,booleanautoDelete,Map<String,Object>arguments)參數(shù)明細1、隊列名2、是否持久3、是否獨占此4、隊列不用是5 ,true,false,false,//交換機和隊列綁定Stringqueue,Stringexchange,String參數(shù)1、隊列名2、交換機名3、路由 //定義消費方DefaultConsumerdefaultConsumer=new{publicvoidhandleDelivery(StringconsumerTag,EnvelopeAMQP.BasicPropertiesAMQP.BasicPropertiesproperties,byte[]body)throws{longdeliveryTag=envelope.getDeliveryTag();Stringexchange=envelope.getExchange();//Stringmessage=newString(body,}隊列Stringqueue,booleanautoAck,Consumer12、是否自動回復,設置為true為表示消息接收到自 mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動回復 ,true,}}3、發(fā)送消費測4.4.4思1、Routing模式和Publish/subscibe有啥區(qū)工作模路由模式代案例根據(jù)用戶設置去通知用戶,設置接收 param1param2:交換機類型四種交換機類型:direct、fanout、topic、channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, 通",null,"inform.sms",null,",null,完整代碼packagepackageimportcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importimportpublicclassProducer04_topics//隊列名privatestaticfinalString =privatestaticfinalStringQUEUE_INFORM_SMS=privatestaticfinalStringEXCHANGE_TOPICS_INFORM="exchange_topics_inform";publicstaticvoidmain(String[]args){Connectionconnection=null;Channelchannel=null;try//創(chuàng)建一個與MQ的連ConnectionFactoryfactory=newConnectionFactory();factory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服器//接connection=channel=connection.createChannel(); 交換機Stringexchange,BuiltinExchangeType*參數(shù)明細1、交換機名2、交換機類型,fanout、topic、direct、channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, 隊參數(shù)明細1、隊列名2、是否持久3、是否獨占此4、隊列不用是5 ,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);byte[]

//發(fā)送郵件消for(intStringmessage= informto//向交換機發(fā)送消息Stringexchange,StringroutingKey,BasicProperties參數(shù)1、交換機名稱,不指令使用默認交換機名稱Default2、routingKey(路由key),根據(jù)key名稱將消息轉發(fā)到具體的隊列,這里填寫隊列名稱表息將發(fā)到此

3、消息屬4、消息內(nèi)channel.basicPublish(EXCHANGE_TOPICS_INFORM, ",System.out.println("SendMessageis:'"+message+} 消for(intStringmessage="smsinformtouser"+i;channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,System.out.println("SendMessageis:'"+message+} for(intStringmessage="smsand informtouser"+i;channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms. ",null,System.out.println("SendMessageis:'"+message+}}catch(IOException{}catch(TimeoutException{try{}catch(IOException{}catch(TimeoutException{}}try{}catch(IOException{}}}}}中間以“.”分隔 隊,true,false,false,channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false, 交換channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM,//綁 通知隊//綁定sms通知測思使用Routing模式也可以實現(xiàn)本案例,共設置三個routingkey,分別 、sms、 隊列綁 all,sms隊列綁定sms和all,這樣就可以實現(xiàn)上邊案例的功能,實現(xiàn)過程比topics復雜Headerheader模式與routing不同的地方在于,header模式取消rouingkey,使用header中的keyvalue(鍵值對)匹配隊列。案例根據(jù)用戶設置去通知用戶,設置接收 代碼生產(chǎn)Map<String,Map<String,Object>=newHashtable<String, .put("inform_type", Map<String,Object>headers_sms=newHashtable<String,Object>();headers_sms.put("inform_type","sms"); 通知StringStringmessage=informtoMap<String,Object>headers=newHashtable<String, ");//匹 通知消費者綁定的 AMQP.BasicProperties.Builderproperties=newAMQP.BasicProperties.Builder(); 通channel.basicPublish(EXCHANGE_HEADERS_INFORM,"",properties.build(),發(fā)送郵件消費channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM,channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM,Map<String,Object>headers_ =newHashtable<String,Object>(); "," //交換機和隊列綁,true,RPC即客戶端調用服務端的方法,使用MQ可以實現(xiàn)RPC的異步調用,基于Direct交換機實現(xiàn),流程如下2、服務端RPC請求隊列的消息,收到消息后執(zhí)行服務端的方法,得到方法返回的結 Spring整合我們選擇基于Spring-Rabbit去操作/spring-projects/spring-使用spring-boot-starter-amqp會自動添加spring-rabbit依賴,如下:配置連接rabbitmqport:44000name:host:port:5672username:guestpassword:guestvirtualHost:/、定義RabitCo?類,配置Echange、Queeoi交換機。packagepackageimportimportimportimportpublicclassRabbitmqConfigpublicstaticfinalStringQUEUE_INFORM_ ="queue_inform_ publi

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論