




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第PHP+RabbitMQ實(shí)現(xiàn)消息隊(duì)列的完整代碼為什么使用RabbitMq而不是ActiveMq或者RocketMq?
首先,從業(yè)務(wù)上來講,我并不要求消息的100%接受率,并且,我需要結(jié)合php開發(fā),RabbitMq相較RocketMq,延遲較低(微妙級(jí))。至于ActiveMq,貌似問題較多。RabbitMq對(duì)各種語言的支持較好,所以選擇RabbitMq。
先安裝PHP對(duì)應(yīng)的RabbitMQ,這里用的是php_amqp不同的擴(kuò)展實(shí)現(xiàn)方式會(huì)有細(xì)微的差異.
php擴(kuò)展地址:/package/amqp
具體以官網(wǎng)為準(zhǔn)/getstarted.html
介紹
config.php配置信息
BaseMQ.phpMQ基類
ProductMQ.php生產(chǎn)者類
ConsumerMQ.php消費(fèi)者類
Consumer2MQ.php消費(fèi)者2(可有多個(gè))
config.php
return[
//配置
'host'=[
'host'='',
'port'='5672',
'login'='guest',
'password'='guest',
'vhost'='/',
//交換機(jī)
'exchange'='word',
//路由
'routes'=[],
];
BaseMQ.php
*CreatedbyPhpStorm.
*User:pc
*Date:2025/12/13
*Time:14:11
namespaceMyObjSummary\rabbitMQ;
/**Member
*AMQPChannel
*AMQPConnection
*AMQPEnvelope
*AMQPExchange
*AMQPQueue
*ClassBaseMQ
*@packageMyObjSummary\rabbitMQ
classBaseMQ
/**MQChannel
*@var\AMQPChannel
public$AMQPChannel;
/**MQLink
*@var\AMQPConnection
public$AMQPConnection;
/**MQEnvelope
*@var\AMQPEnvelope
public$AMQPEnvelope;
/**MQExchange
*@var\AMQPExchange
public$AMQPExchange;
/**MQQueue
*@var\AMQPQueue
public$AMQPQueue;
/**conf
*@var
public$conf;
/**exchange
*@var
public$exchange;
/**link
*BaseMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
$conf=require'config.php';
if(!$conf)
thrownew\AMQPConnectionException('configerror!');
$this-conf=$conf['host'];
$this-exchange=$conf['exchange'];
$this-AMQPConnection=new\AMQPConnection($this-conf);
if(!$this-AMQPConnection-connect())
thrownew\AMQPConnectionException("Cannotconnecttothebroker!\n");
*closelink
publicfunctionclose()
$this-AMQPConnection-disconnect();
/**Channel
*@return\AMQPChannel
*@throws\AMQPConnectionException
publicfunctionchannel()
if(!$this-AMQPChannel){
$this-AMQPChannel=new\AMQPChannel($this-AMQPConnection);
return$this-AMQPChannel;
/**Exchange
*@return\AMQPExchange
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionexchange()
if(!$this-AMQPExchange){
$this-AMQPExchange=new\AMQPExchange($this-channel());
$this-AMQPExchange-setName($this-exchange);
return$this-AMQPExchange;
/**queue
*@return\AMQPQueue
*@throws\AMQPConnectionException
*@throws\AMQPQueueException
publicfunctionqueue()
if(!$this-AMQPQueue){
$this-AMQPQueue=new\AMQPQueue($this-channel());
return$this-AMQPQueue;
/**Envelope
*@return\AMQPEnvelope
publicfunctionenvelope()
if(!$this-AMQPEnvelope){
$this-AMQPEnvelope=new\AMQPEnvelope();
return$this-AMQPEnvelope;
}
ProductMQ.php
//生產(chǎn)者P
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classProductMQextendsBaseMQ
private$routes=['hello','word'];//路由key
*ProductMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**只控制發(fā)送成功不接受消費(fèi)者是否收到
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
publicfunctionrun()
//頻道
$channel=$this-channel();
//創(chuàng)建交換機(jī)對(duì)象
$ex=$this-exchange();
//消息內(nèi)容
$message='productmessage'.rand(1,99999);
//開始事務(wù)
$channel-startTransaction();
$sendEd=true;
foreach($this-routesas$route){
$sendEd=$ex-publish($message,$route);
echo"SendMessage:".$sendEd."\n";
if(!$sendEd){
$channel-rollbackTransaction();
$channel-commitTransaction();//提交事務(wù)
$this-close();
die;
try{
(newProductMQ())-run();
}catch(\Exception$exception){
var_dump($exception-getMessage());
}
ConsumerMQ.php
//消費(fèi)者C
namespaceMyObjSummary\rabbitMQ;
require'BaseMQ.php';
classConsumerMQextendsBaseMQ
private$q_name='hello';//隊(duì)列名
private$route='hello';//路由key
*ConsumerMQconstructor.
*@throws\AMQPConnectionException
publicfunction__construct()
parent::__construct();
/**接受消息如果終止重連時(shí)會(huì)有消息
*@throws\AMQPChannelException
*@throws\AMQPConnectionException
*@throws\AMQPExchangeException
*@throws\AMQPQueueException
publicfunctionrun()
//創(chuàng)建交換機(jī)
$ex=$this-exchange();
$ex-setType(AMQP_EX_TYPE_DIRECT);//direct類型
$ex-setFlags(AMQP_DURABLE);//持久化
//echo"ExchangeStatus:".$ex-declare()."\n";
//創(chuàng)建隊(duì)列
$q=$this-queue();
//var_dump($q-declare());exit();
$q-setName($this-q_name);
$q-setFlags(AMQP_DURABLE);//持久化
//echo"MessageTotal:".$q-declareQueue()."\n";
//綁定交換機(jī)與隊(duì)列,并指定路由鍵
echo'QueueBind:'.$q-bind($this-exchange,$this-route)."\n";
//阻塞模式接收消息
echo"Message:\n";
while(True){
$q-consume(function($envelope,$queue){
$msg=$envelope-getBody();
echo$msg."\n";//處理消息
$queue-ack($envelope-getDeliveryTag());//
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年宣傳教育全年工作總結(jié)模版
- 傭金返還合同范例
- 企業(yè)文化在辦公空間人性化設(shè)計(jì)中的體現(xiàn)
- 2025年新《黨政領(lǐng)導(dǎo)干部選拔任用工作條例》學(xué)習(xí)討論發(fā)言稿模版
- 機(jī)器人焊接 14 項(xiàng)目七任務(wù)7.2教學(xué)設(shè)計(jì)
- 醫(yī)療大數(shù)據(jù)挑戰(zhàn)與機(jī)遇并存的價(jià)值挖掘
- 倉儲(chǔ)協(xié)議合同范例簡(jiǎn)短范例
- 幼兒園教師年度考核個(gè)人工作總結(jié)模版
- 關(guān)于建筑師負(fù)責(zé)制的工作總結(jié)模版
- 醫(yī)療物資全鏈條管理的實(shí)踐與思考
- 職業(yè)生涯規(guī)劃家庭影響因素
- 潔凈環(huán)境監(jiān)測(cè)課件
- Python數(shù)據(jù)分析與應(yīng)用-從數(shù)據(jù)獲取到可視化(第2版)課件 第3章 數(shù)據(jù)分析庫pandas基礎(chǔ)
- 疼痛科護(hù)理的現(xiàn)狀與發(fā)展趨勢(shì)
- 用戶思維培訓(xùn)課件
- 企業(yè)反商業(yè)賄賂法律法規(guī)培訓(xùn)
- 安心護(hù)行 從個(gè)案分析看創(chuàng)傷骨科患者VTE管理低分子肝素合理應(yīng)用版本
- CONSORT2010流程圖(FlowDiagram)【模板】文檔
- 軟件質(zhì)量保證與測(cè)試技術(shù)智慧樹知到課后章節(jié)答案2023年下青島工學(xué)院
- 備用柴油發(fā)電機(jī)定期啟動(dòng)試驗(yàn)記錄表
- 學(xué)前教育畢業(yè)論文5000字【6篇】
評(píng)論
0/150
提交評(píng)論