版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
消息隊(duì)列:RabbitMQ:RabbitMQ高級(jí)特性:Exchange類(lèi)型1RabbitMQ高級(jí)特性概覽1.1Exchange的概念與作用在RabbitMQ中,Exchange扮演著消息分發(fā)的核心角色。它接收來(lái)自生產(chǎn)者的消息,然后根據(jù)既定的規(guī)則將消息發(fā)送到一個(gè)或多個(gè)隊(duì)列中。Exchange的類(lèi)型決定了消息如何被路由到隊(duì)列。以下是Exchange的幾個(gè)關(guān)鍵點(diǎn):消息接收:生產(chǎn)者將消息發(fā)送到Exchange,而不是直接發(fā)送到隊(duì)列。消息路由:Exchange根據(jù)其類(lèi)型和綁定規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。隊(duì)列綁定:隊(duì)列需要綁定到Exchange上,才能接收來(lái)自Exchange的消息。1.1.1Exchange的類(lèi)型RabbitMQ支持多種Exchange類(lèi)型,每種類(lèi)型都有其特定的路由邏輯:DirectExchangeFanoutExchangeTopicExchangeHeadersExchange1.2Exchange類(lèi)型詳解1.2.1DirectExchangeDirectExchange是最簡(jiǎn)單的Exchange類(lèi)型之一。它根據(jù)消息的routingkey直接將消息路由到一個(gè)特定的隊(duì)列。如果routingkey與隊(duì)列的綁定鍵完全匹配,消息就會(huì)被發(fā)送到該隊(duì)列。示例代碼importpika
#連接到RabbitMQ
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Direct類(lèi)型的Exchange
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#聲明隊(duì)列并綁定到Exchange
channel.queue_declare(queue='error')
channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')
#發(fā)送消息
channel.basic_publish(exchange='direct_logs',routing_key='error',body='Criticalerroroccurred')
#關(guān)閉連接
connection.close()1.2.2FanoutExchangeFanoutExchange將消息廣播到所有綁定到它的隊(duì)列中,無(wú)論routingkey是什么。這使得FanoutExchange非常適合實(shí)現(xiàn)發(fā)布/訂閱模式。示例代碼importpika
#連接到RabbitMQ
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Fanout類(lèi)型的Exchange
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#聲明隊(duì)列并綁定到Exchange
channel.queue_declare(queue='log_queue1')
channel.queue_bind(exchange='logs',queue='log_queue1')
channel.queue_declare(queue='log_queue2')
channel.queue_bind(exchange='logs',queue='log_queue2')
#發(fā)送消息
channel.basic_publish(exchange='logs',routing_key='',body='Logmessage')
#關(guān)閉連接
connection.close()1.2.3TopicExchangeTopicExchange允許使用模式匹配來(lái)路由消息。Routingkey可以是一個(gè)點(diǎn)分隔的字符串,隊(duì)列綁定鍵可以包含通配符*和#,其中*匹配一個(gè)單詞,#匹配零個(gè)或多個(gè)單詞。示例代碼importpika
#連接到RabbitMQ
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Topic類(lèi)型的Exchange
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#聲明隊(duì)列并綁定到Exchange
channel.queue_declare(queue='kern.error')
channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')
channel.queue_declare(queue='')
channel.queue_bind(exchange='topic_logs',queue='',routing_key='*.info')
#發(fā)送消息
channel.basic_publish(exchange='topic_logs',routing_key='kern.error',body='Kernelerroroccurred')
#關(guān)閉連接
connection.close()1.2.4HeadersExchangeHeadersExchange不使用routingkey,而是基于消息頭中的屬性進(jìn)行路由。隊(duì)列綁定到Exchange時(shí),可以指定一個(gè)或多個(gè)鍵值對(duì),消息頭中必須包含這些鍵值對(duì),才能被路由到相應(yīng)的隊(duì)列。示例代碼importpika
importjson
#連接到RabbitMQ
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Headers類(lèi)型的Exchange
channel.exchange_declare(exchange='headers_logs',exchange_type='headers')
#聲明隊(duì)列并綁定到Exchange
channel.queue_declare(queue='error_queue')
channel.queue_bind(exchange='headers_logs',queue='error_queue',arguments={'x-match':'all','severity':'error'})
#發(fā)送消息
headers={'severity':'error'}
channel.basic_publish(exchange='headers_logs',routing_key='',body='Errormessage',properties=pika.BasicProperties(headers=headers))
#關(guān)閉連接
connection.close()通過(guò)以上示例,我們可以看到不同類(lèi)型的Exchange如何根據(jù)其特性將消息路由到相應(yīng)的隊(duì)列中。選擇正確的Exchange類(lèi)型對(duì)于構(gòu)建高效、靈活的消息傳遞系統(tǒng)至關(guān)重要。2Exchange類(lèi)型深入解析2.1DirectExchange:直接交換機(jī)2.1.1原理直接交換機(jī)(DirectExchange)是RabbitMQ中最簡(jiǎn)單的一種交換機(jī)類(lèi)型。它根據(jù)消息的routingkey直接將消息路由到指定的隊(duì)列。如果消息的routingkey與隊(duì)列綁定的routingkey完全匹配,那么消息就會(huì)被發(fā)送到該隊(duì)列。這種交換機(jī)類(lèi)型非常適合一對(duì)一的場(chǎng)景,即一個(gè)生產(chǎn)者發(fā)送消息給一個(gè)消費(fèi)者。2.1.2示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明交換機(jī)
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#聲明隊(duì)列
channel.queue_declare(queue='error')
channel.queue_declare(queue='info')
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')
channel.queue_bind(exchange='direct_logs',queue='info',routing_key='info')
#發(fā)送消息
forseverityin['error','info']:
message=f"{severity}:Hello,world!"
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(f"[x]Sent{message}")
#關(guān)閉連接
connection.close()在上述代碼中,我們創(chuàng)建了一個(gè)direct_logs直接交換機(jī),并聲明了兩個(gè)隊(duì)列error和info。然后,我們將隊(duì)列綁定到交換機(jī)上,分別使用error和info作為routingkey。最后,我們發(fā)送了兩條消息,每條消息的routingkey與隊(duì)列的bindingkey相匹配,因此消息會(huì)被正確地路由到相應(yīng)的隊(duì)列。2.2FanoutExchange:扇形交換機(jī)2.2.1原理扇形交換機(jī)(FanoutExchange)將消息廣播到所有綁定到它的隊(duì)列中。無(wú)論消息的routingkey是什么,扇形交換機(jī)都會(huì)將消息發(fā)送給所有綁定的隊(duì)列。這種交換機(jī)類(lèi)型適用于需要將消息發(fā)送給多個(gè)消費(fèi)者的情況。2.2.2示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明扇形交換機(jī)
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#聲明隊(duì)列
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='logs',queue='queue1')
channel.queue_bind(exchange='logs',queue='queue2')
#發(fā)送消息
message="Hello,logs!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(f"[x]Sent{message}")
#關(guān)閉連接
connection.close()在這個(gè)例子中,我們創(chuàng)建了一個(gè)名為logs的扇形交換機(jī),并聲明了兩個(gè)隊(duì)列queue1和queue2。然后,我們將這兩個(gè)隊(duì)列綁定到交換機(jī)上。當(dāng)我們發(fā)送消息時(shí),由于扇形交換機(jī)的特性,消息會(huì)被廣播到所有綁定的隊(duì)列中,即queue1和queue2。2.3TopicExchange:主題交換機(jī)2.3.1原理主題交換機(jī)(TopicExchange)允許使用通配符來(lái)匹配routingkey。它使用星號(hào)(*)和井號(hào)(#)作為通配符,其中星號(hào)表示匹配一個(gè)單詞,井號(hào)表示匹配零個(gè)或多個(gè)單詞。這種交換機(jī)類(lèi)型非常適合需要根據(jù)消息的主題進(jìn)行路由的場(chǎng)景。2.3.2示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明主題交換機(jī)
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#聲明隊(duì)列
channel.queue_declare(queue='')
channel.queue_declare(queue='kern.error')
channel.queue_declare(queue='')
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='topic_logs',queue='',routing_key='')
channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')
channel.queue_bind(exchange='topic_logs',queue='',routing_key='')
#發(fā)送消息
forrouting_keyin['','kern.error','']:
message=f"{routing_key}:Hello,world!"
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(f"[x]Sent{message}")
#關(guān)閉連接
connection.close()在這個(gè)例子中,我們創(chuàng)建了一個(gè)topic_logs主題交換機(jī),并聲明了三個(gè)隊(duì)列。然后,我們將隊(duì)列綁定到交換機(jī)上,使用了通配符*和#。當(dāng)我們發(fā)送消息時(shí),消息會(huì)根據(jù)其routingkey被路由到相應(yīng)的隊(duì)列。例如,kern.error消息會(huì)被路由到kern.error隊(duì)列,因?yàn)樗ヅ淞薻ern.*的bindingkey。2.4HeadersExchange:頭部交換機(jī)2.4.1原理頭部交換機(jī)(HeadersExchange)不使用routingkey,而是使用一個(gè)或多個(gè)鍵值對(duì)(headers)來(lái)路由消息。這種交換機(jī)類(lèi)型允許更復(fù)雜的路由邏輯,例如基于消息內(nèi)容的路由。隊(duì)列綁定到交換機(jī)時(shí),可以指定一個(gè)或多個(gè)鍵值對(duì),如果消息的headers與隊(duì)列的bindingheaders匹配,那么消息就會(huì)被發(fā)送到該隊(duì)列。2.4.2示例代碼importpika
importjson
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明頭部交換機(jī)
channel.exchange_declare(exchange='headers_logs',exchange_type='headers')
#聲明隊(duì)列
channel.queue_declare(queue='queue1')
channel.queue_declare(queue='queue2')
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='headers_logs',queue='queue1',arguments={'x-match':'all','header1':'value1','header2':'value2'})
channel.queue_bind(exchange='headers_logs',queue='queue2',arguments={'x-match':'any','header1':'value1'})
#發(fā)送消息
headers={'header1':'value1','header2':'value2'}
message="Hello,headers!"
channel.basic_publish(exchange='headers_logs',routing_key='',body=message,properties=pika.BasicProperties(headers=json.dumps(headers)))
print(f"[x]Sent{message}")
#關(guān)閉連接
connection.close()在這個(gè)例子中,我們創(chuàng)建了一個(gè)headers_logs頭部交換機(jī),并聲明了兩個(gè)隊(duì)列queue1和queue2。然后,我們將隊(duì)列綁定到交換機(jī)上,使用了x-match參數(shù)來(lái)指定匹配規(guī)則。queue1使用all匹配規(guī)則,意味著所有headers鍵值對(duì)都必須匹配;queue2使用any匹配規(guī)則,意味著只要有一個(gè)headers鍵值對(duì)匹配即可。當(dāng)我們發(fā)送消息時(shí),消息的headers被指定為{'header1':'value1','header2':'value2'},因此消息會(huì)被路由到queue1,因?yàn)樗鼭M(mǎn)足了所有headers的匹配條件,同時(shí)也會(huì)被路由到queue2,因?yàn)樗鼭M(mǎn)足了至少一個(gè)headers的匹配條件。3Exchange類(lèi)型實(shí)戰(zhàn)應(yīng)用3.1DirectExchange的配置與使用3.1.1原理DirectExchange是一種基于路由鍵(routingkey)的交換機(jī)類(lèi)型。它將消息路由到那些綁定鍵(bindingkey)與消息的路由鍵完全匹配的隊(duì)列上。這種類(lèi)型的交換機(jī)非常直接,適合于一對(duì)一或多對(duì)一的場(chǎng)景,其中消息的發(fā)送者明確知道消息應(yīng)該被發(fā)送到哪個(gè)隊(duì)列。3.1.2實(shí)戰(zhàn)代碼示例發(fā)送端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Direct類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#定義路由鍵
severity='info'
#發(fā)送消息
message="Info:Thisisatestmessage"
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print("[x]Sent%r:%r"%(severity,message))
connection.close()接收端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Direct類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#聲明一個(gè)隊(duì)列并綁定到交換機(jī)
result=channel.queue_declare(queue='',exclusive=True)
queue_name=result.method.queue
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key='info')
#定義回調(diào)函數(shù)處理消息
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#開(kāi)始消費(fèi)消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.1.3描述在上述示例中,發(fā)送端通過(guò)direct_logs交換機(jī)發(fā)送了一條信息,其路由鍵為info。接收端聲明了一個(gè)隊(duì)列,并將該隊(duì)列綁定到direct_logs交換機(jī)上,綁定鍵同樣為info。因此,發(fā)送端發(fā)送的信息將被路由到接收端聲明的隊(duì)列中,從而實(shí)現(xiàn)消息的傳遞。3.2FanoutExchange的配置與使用3.2.1原理FanoutExchange是一種廣播類(lèi)型的交換機(jī),它將接收到的消息發(fā)送給所有與它綁定的隊(duì)列。這種類(lèi)型的交換機(jī)不關(guān)心路由鍵,它只是簡(jiǎn)單地將消息分發(fā)給所有綁定的隊(duì)列,類(lèi)似于廣播。3.2.2實(shí)戰(zhàn)代碼示例發(fā)送端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Fanout類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#發(fā)送消息
message="Hello,world!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print("[x]Sent%r"%message)
connection.close()接收端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Fanout類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#聲明一個(gè)隊(duì)列并綁定到交換機(jī)
result=channel.queue_declare(queue='',exclusive=True)
queue_name=result.method.queue
channel.queue_bind(exchange='logs',queue=queue_name)
#定義回調(diào)函數(shù)處理消息
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#開(kāi)始消費(fèi)消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.2.3描述在這個(gè)示例中,發(fā)送端聲明了一個(gè)Fanout類(lèi)型的交換機(jī)logs,并發(fā)送了一條消息。接收端聲明了多個(gè)隊(duì)列,并將這些隊(duì)列綁定到logs交換機(jī)上。由于Fanout交換機(jī)的特性,發(fā)送端發(fā)送的每條消息都將被廣播到所有綁定的隊(duì)列中,從而實(shí)現(xiàn)消息的廣播分發(fā)。3.3TopicExchange的配置與使用3.3.1原理TopicExchange是一種基于模式匹配的交換機(jī)類(lèi)型。它允許使用通配符來(lái)匹配路由鍵,從而將消息路由到多個(gè)隊(duì)列。路由鍵是一個(gè)點(diǎn)分隔的字符串,例如stock.usd.nyse。TopicExchange支持兩種通配符:*(匹配一個(gè)單詞)和#(匹配零個(gè)或多個(gè)單詞)。3.3.2實(shí)戰(zhàn)代碼示例發(fā)送端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Topic類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#發(fā)送消息
routing_key='stock.usd.nyse'
message="StockpriceupdateforNYSE"
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print("[x]Sent%r:%r"%(routing_key,message))
connection.close()接收端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Topic類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#聲明一個(gè)隊(duì)列并綁定到交換機(jī)
result=channel.queue_declare(queue='',exclusive=True)
queue_name=result.method.queue
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key='stock.#')
#定義回調(diào)函數(shù)處理消息
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#開(kāi)始消費(fèi)消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.3.3描述在這個(gè)示例中,發(fā)送端使用了Topic類(lèi)型的交換機(jī)topic_logs,并發(fā)送了一條關(guān)于股票價(jià)格更新的消息,其路由鍵為stock.usd.nyse。接收端聲明了一個(gè)隊(duì)列,并將該隊(duì)列綁定到topic_logs交換機(jī)上,綁定鍵為stock.#,這意味著接收端將接收所有以stock開(kāi)頭的消息,無(wú)論后面有多少個(gè)單詞。因此,發(fā)送端發(fā)送的消息將被路由到接收端聲明的隊(duì)列中,實(shí)現(xiàn)了基于模式匹配的消息路由。3.4HeadersExchange的配置與使用3.4.1原理HeadersExchange是一種基于消息頭(headers)的交換機(jī)類(lèi)型。它允許使用消息頭中的鍵值對(duì)來(lái)路由消息,而不是使用路由鍵。這種類(lèi)型的交換機(jī)可以實(shí)現(xiàn)更復(fù)雜的路由邏輯,例如基于消息內(nèi)容的路由。3.4.2實(shí)戰(zhàn)代碼示例發(fā)送端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Headers類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='headers_logs',exchange_type='headers')
#發(fā)送消息
message="Thisisatestmessage"
properties=pika.BasicProperties(headers={'type':'test','priority':'high'})
channel.basic_publish(exchange='headers_logs',routing_key='',body=message,properties=properties)
print("[x]Sent%r"%message)
connection.close()接收端importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)Headers類(lèi)型的交換機(jī)
channel.exchange_declare(exchange='headers_logs',exchange_type='headers')
#聲明一個(gè)隊(duì)列并綁定到交換機(jī)
result=channel.queue_declare(queue='',exclusive=True)
queue_name=result.method.queue
channel.queue_bind(exchange='headers_logs',queue=queue_name,arguments={'x-match':'all','type':'test','priority':'high'})
#定義回調(diào)函數(shù)處理消息
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#開(kāi)始消費(fèi)消息
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.4.3描述在這個(gè)示例中,發(fā)送端使用了Headers類(lèi)型的交換機(jī)headers_logs,并發(fā)送了一條消息,同時(shí)在消息頭中包含了type和priority兩個(gè)鍵值對(duì)。接收端聲明了一個(gè)隊(duì)列,并將該隊(duì)列綁定到headers_logs交換機(jī)上,綁定時(shí)指定了type和priority兩個(gè)鍵值對(duì)的匹配條件。因此,只有當(dāng)消息頭中的鍵值對(duì)完全匹配接收端指定的條件時(shí),消息才會(huì)被路由到接收端聲明的隊(duì)列中,實(shí)現(xiàn)了基于消息頭的復(fù)雜路由邏輯。4高級(jí)Exchange模式4.1Exchange綁定Exchange在RabbitMQ中,Exchange綁定Exchange是一種高級(jí)特性,允許一個(gè)Exchange將消息轉(zhuǎn)發(fā)到另一個(gè)Exchange。這種模式可以用于構(gòu)建復(fù)雜的消息路由網(wǎng)絡(luò),實(shí)現(xiàn)更精細(xì)的消息控制和分發(fā)。4.1.1原理當(dāng)一個(gè)消息被發(fā)送到一個(gè)Exchange時(shí),根據(jù)綁定規(guī)則,這個(gè)消息可能會(huì)被轉(zhuǎn)發(fā)到另一個(gè)Exchange,然后再由后者根據(jù)其綁定規(guī)則進(jìn)行進(jìn)一步的路由。這種機(jī)制可以用于實(shí)現(xiàn)消息的復(fù)制、過(guò)濾和重新路由。4.1.2示例代碼importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明兩個(gè)Exchange,一個(gè)為fanout類(lèi)型,一個(gè)為direct類(lèi)型
channel.exchange_declare(exchange='fanout_exchange',exchange_type='fanout')
channel.exchange_declare(exchange='direct_exchange',exchange_type='direct')
#將fanout_exchange綁定到direct_exchange
channel.exchange_bind(exchange='fanout_exchange',destination='direct_exchange')
#發(fā)布消息到fanout_exchange
channel.basic_publish(exchange='fanout_exchange',
routing_key='',
body='Hello,thisisatestmessage.')
#關(guān)閉連接
connection.close()4.1.3解釋在上述示例中,我們首先創(chuàng)建了一個(gè)fanout_exchange,類(lèi)型為fanout,這意味著所有綁定到這個(gè)Exchange的消息都會(huì)被廣播到所有隊(duì)列。然后,我們創(chuàng)建了一個(gè)direct_exchange,類(lèi)型為direct,這意味著消息將根據(jù)routingkey被路由到特定的隊(duì)列。通過(guò)channel.exchange_bind,我們將fanout_exchange綁定到direct_exchange,這意味著所有發(fā)送到fanout_exchange的消息都會(huì)被復(fù)制并發(fā)送到direct_exchange。最后,我們通過(guò)channel.basic_publish向fanout_exchange發(fā)送了一條消息,這條消息將被廣播并最終轉(zhuǎn)發(fā)到direct_exchange。4.2Exchange的死信隊(duì)列死信隊(duì)列(DeadLetterQueue,DLQ)是RabbitMQ中用于處理無(wú)法被消費(fèi)的消息的一種機(jī)制。當(dāng)消息因?yàn)槟承┰驘o(wú)法被隊(duì)列中的消費(fèi)者消費(fèi)時(shí),這些消息會(huì)被轉(zhuǎn)發(fā)到一個(gè)DLQ中,以便進(jìn)行進(jìn)一步的處理或分析。4.2.1原理死信可以由以下幾種情況產(chǎn)生:-消息在隊(duì)列中達(dá)到TTL(TimeToLive)。-隊(duì)列達(dá)到最大長(zhǎng)度,無(wú)法再接收更多消息。-消費(fèi)者拒絕消息,并且requeue參數(shù)設(shè)置為False。當(dāng)這些情況發(fā)生時(shí),消息會(huì)被標(biāo)記為死信,并根據(jù)隊(duì)列的配置被轉(zhuǎn)發(fā)到一個(gè)DLQ中。4.2.2示例代碼importpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)死信隊(duì)列
channel.queue_declare(queue='dlq_queue',arguments={'x-dead-letter-exchange':'dlx_exchange'})
#聲明一個(gè)死信Exchange
channel.exchange_declare(exchange='dlx_exchange',exchange_type='direct')
#發(fā)布消息到一個(gè)普通隊(duì)列,該隊(duì)列將消息轉(zhuǎn)發(fā)到死信隊(duì)列
channel.queue_declare(queue='normal_queue',arguments={'x-message-ttl':10000,'x-dead-letter-exchange':'dlx_exchange'})
channel.queue_bind(queue='normal_queue',exchange='normal_exchange',routing_key='normal_key')
channel.basic_publish(exchange='normal_exchange',
routing_key='normal_key',
body='Thismessagewillexpirein10secondsandgotoDLQ.')
#關(guān)閉連接
connection.close()4.2.3解釋在這個(gè)示例中,我們首先聲明了一個(gè)名為dlq_queue的隊(duì)列,它將作為死信隊(duì)列。我們通過(guò)arguments參數(shù)設(shè)置了x-dead-letter-exchange,指定了當(dāng)消息成為死信時(shí),應(yīng)該被轉(zhuǎn)發(fā)到的Exchange。然后,我們聲明了一個(gè)名為dlx_exchange的Exchange,類(lèi)型為direct,這將是死信消息的轉(zhuǎn)發(fā)目標(biāo)。接下來(lái),我們聲明了一個(gè)名為normal_queue的隊(duì)列,設(shè)置了x-message-ttl參數(shù),這意味著隊(duì)列中的消息將在10秒后過(guò)期。同時(shí),我們?cè)O(shè)置了x-dead-letter-exchange參數(shù),指向dlx_exchange,確保過(guò)期的消息會(huì)被轉(zhuǎn)發(fā)到DLQ。最后,我們通過(guò)channel.basic_publish向normal_exchange發(fā)送了一條消息,這條消息將被路由到normal_queue。當(dāng)消息過(guò)期后,它將被轉(zhuǎn)發(fā)到dlx_exchange,并最終到達(dá)dlq_queue。4.3Exchange的優(yōu)先級(jí)隊(duì)列優(yōu)先級(jí)隊(duì)列允許在RabbitMQ中為消息設(shè)置優(yōu)先級(jí),確保高優(yōu)先級(jí)的消息被優(yōu)先消費(fèi)。這在處理緊急或重要消息時(shí)非常有用。4.3.1原理優(yōu)先級(jí)隊(duì)列通過(guò)在隊(duì)列聲明時(shí)設(shè)置x-max-priority參數(shù)來(lái)實(shí)現(xiàn)。消息的優(yōu)先級(jí)在發(fā)布時(shí)通過(guò)basic_publish的properties參數(shù)中的priority字段來(lái)指定。優(yōu)先級(jí)范圍通常在0到255之間,0表示最低優(yōu)先級(jí),255表示最高優(yōu)先級(jí)。4.3.2示例代碼importpika
frompika.specimportBasicProperties
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個(gè)優(yōu)先級(jí)隊(duì)列
channel.queue_declare(queue='priority_queue',arguments={'x-max-priority':10})
#發(fā)布兩條消息,一條優(yōu)先級(jí)為5,一條優(yōu)先級(jí)為1
channel.basic_publish(exchange='',
routing_key='priority_queue',
body='Highprioritymessage.',
properties=BasicProperties(priority=5))
channel.basic_publish(exchange='',
routing_key='priority_queue',
body='Lowprioritymessage.',
properties=BasicProperties(priority=1))
#關(guān)閉連接
connection.close()4.3.3解釋在這個(gè)示例中,我們聲明了一個(gè)名為priority_queue的隊(duì)列,并設(shè)置了x-max-priority參數(shù)為10,這意味著這個(gè)隊(duì)列可以處理0到10之間的優(yōu)先級(jí)消息。然后,我們通過(guò)channel.basic_publish向priority_queue發(fā)送了兩條消息。第一條消息的優(yōu)先級(jí)設(shè)置為5,第二條消息的優(yōu)先級(jí)設(shè)置為1。我們使用BasicProperties類(lèi)來(lái)設(shè)置消息的優(yōu)先級(jí)。當(dāng)消費(fèi)者從priority_queue中消費(fèi)消息時(shí),高優(yōu)先級(jí)的消息將被優(yōu)先處理。如果隊(duì)列中有多個(gè)相同優(yōu)先級(jí)的消息,它們將按照先進(jìn)先出(FIFO)的順序被消費(fèi)。5Exchange類(lèi)型最佳實(shí)踐5.1性能優(yōu)化策略5.1.1使用直連交換機(jī)(DirectExchange)減少消息延遲直連交換機(jī)是最簡(jiǎn)單的交換機(jī)類(lèi)型,它根據(jù)消息的路由鍵直接將消息發(fā)送到指定的隊(duì)列。這種模式在處理大量消息時(shí),可以顯著減少消息的延遲時(shí)間,因?yàn)樗苊饬藦?fù)雜的路由邏輯。示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明交換機(jī)
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#聲明隊(duì)列
channel.queue_declare(queue='error')
channel.queue_declare(queue='warning')
channel.queue_declare(queue='info')
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')
channel.queue_bind(exchange='direct_logs',queue='warning',routing_key='warning')
channel.queue_bind(exchange='direct_logs',queue='info',routing_key='info')
#發(fā)送消息
forseverityin['error','warning','info']:
message=f"{severity}:HelloWorld!"
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(f"[x]Sent{message}")
#關(guān)閉連接
connection.close()解釋在上述代碼中,我們創(chuàng)建了一個(gè)直連交換機(jī)direct_logs,并聲明了三個(gè)隊(duì)列error、warning和info。然后,我們將每個(gè)隊(duì)列綁定到交換機(jī)上,使用各自的路由鍵。當(dāng)消息被發(fā)送時(shí),RabbitMQ會(huì)根據(jù)路由鍵直接將消息發(fā)送到相應(yīng)的隊(duì)列,從而減少了不必要的處理時(shí)間。5.1.2扇形交換機(jī)(FanoutExchange)實(shí)現(xiàn)廣播扇形交換機(jī)將消息發(fā)送到所有綁定到它的隊(duì)列,這在需要將消息廣播給多個(gè)消費(fèi)者時(shí)非常有用。通過(guò)使用扇形交換機(jī),可以確保消息被所有訂閱者接收,這對(duì)于實(shí)時(shí)更新和通知系統(tǒng)特別有效。示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明扇形交換機(jī)
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#聲明隊(duì)列
result=channel.queue_declare(queue='',exclusive=True)
queue_name=result.method.queue
#綁定隊(duì)列到交換機(jī)
channel.queue_bind(exchange='logs',queue=queue_name)
#開(kāi)始消費(fèi)
defcallback(ch,method,properties,body):
print(f"[x]Received{body}")
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()解釋這段代碼展示了如何使用扇形交換機(jī)logs來(lái)廣播消息。我們聲明了一個(gè)臨時(shí)隊(duì)列,并將其綁定到交換機(jī)上。然后,我們定義了一個(gè)回調(diào)函數(shù)來(lái)處理接收到的消息。當(dāng)消息被發(fā)送到交換機(jī)時(shí),它會(huì)被廣播到所有綁定的隊(duì)列,從而實(shí)現(xiàn)廣播功能。5.2錯(cuò)誤處理與重試機(jī)制5.2.1使用確認(rèn)機(jī)制確保消息處理在RabbitMQ中,可以使用確認(rèn)機(jī)制來(lái)確保消息被正確處理。當(dāng)消費(fèi)者接收到消息后,它會(huì)發(fā)送一個(gè)確認(rèn)給RabbitMQ,RabbitMQ在收到確認(rèn)后才會(huì)將消息從隊(duì)列中移除。如果消費(fèi)者在處理消息時(shí)失敗,沒(méi)有發(fā)送確認(rèn),RabbitMQ會(huì)將消息重新發(fā)送給其他消費(fèi)者,或者在所有消費(fèi)者都失敗時(shí),將消息返回到隊(duì)列中。示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊(duì)列
channel.queue_declare(queue='task_queue',durable=True)
#開(kāi)始消費(fèi)
defcallback(ch,method,properties,body):
print(f"[x]Received{body}")
#模擬消息處理
try:
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
exceptExceptionase:
print(f"[!]Errorprocessingmessage:{e}")
ch.basic_nack(delivery_tag=method.delivery_tag,requeue=True)
channel.basic_consume(queue='task_queue',on_message_callback=callback)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2024年度跨境電商平臺(tái)區(qū)域代理合同范本3篇
- 2024年生物醫(yī)藥企業(yè)股權(quán)收購(gòu)合同匯編3篇
- 淘寶找建筑課程設(shè)計(jì)
- 專(zhuān)題03 閱讀理解之推理判斷題(練習(xí))(解析版)
- 煉鋼廠(chǎng)部門(mén)崗位職責(zé)說(shuō)明書(shū)
- 機(jī)電工程施工組織設(shè)計(jì)
- (一)高標(biāo)準(zhǔn)農(nóng)田施工方案
- 油條配方課程設(shè)計(jì)
- 糖果罐子手工課程設(shè)計(jì)
- 算法課程設(shè)計(jì)總結(jié)
- 織金縣實(shí)興鄉(xiāng)白龍重晶石礦5.0萬(wàn)t-a(新建)項(xiàng)目環(huán)評(píng)報(bào)告
- 妊娠期肝內(nèi)膽汁淤積癥教學(xué)課件
- 【航空個(gè)性化服務(wù)淺析4700字(論文)】
- 保障農(nóng)民工工資支付條例全文及解讀課件
- 中國(guó)移動(dòng)全面預(yù)算管理
- 【部編】小高考:2021年江蘇普通高中學(xué)業(yè)水平測(cè)試歷史試卷
- 公路隧道建設(shè)施工技術(shù)規(guī)范學(xué)習(xí)考試題庫(kù)(400道)
- 新人教版七至九年級(jí)英語(yǔ)單詞表 漢譯英(含音標(biāo))
- 淺談事業(yè)單位固定資產(chǎn)的折舊本科學(xué)位論文
- 食堂管理制度大全
- 愛(ài)普生機(jī)器人中級(jí)培訓(xùn)資料
評(píng)論
0/150
提交評(píng)論