消息隊列:Pulsar:Pulsar消息模型與消息類型_第1頁
消息隊列:Pulsar:Pulsar消息模型與消息類型_第2頁
消息隊列:Pulsar:Pulsar消息模型與消息類型_第3頁
消息隊列:Pulsar:Pulsar消息模型與消息類型_第4頁
消息隊列:Pulsar:Pulsar消息模型與消息類型_第5頁
已閱讀5頁,還剩22頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

消息隊列:Pulsar:Pulsar消息模型與消息類型1消息隊列基礎(chǔ)1.1消息隊列的定義消息隊列是一種應(yīng)用程序間通信(IPC)的模式,它允許消息在發(fā)送者和接收者之間異步傳遞。消息隊列中的消息遵循先進先出(FIFO)原則,但同時也支持更復(fù)雜的路由策略。消息隊列可以提高系統(tǒng)的解耦性、可擴展性和容錯能力。1.2消息隊列的作用消息隊列在現(xiàn)代分布式系統(tǒng)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許發(fā)送者和接收者異步操作,提高系統(tǒng)響應(yīng)速度。負載均衡:通過消息隊列,可以將任務(wù)均勻地分配給多個處理者。削峰填谷:在高負載時,消息隊列可以緩存消息,避免系統(tǒng)過載。系統(tǒng)解耦:發(fā)送者和接收者不需要直接通信,降低了系統(tǒng)的耦合度。數(shù)據(jù)持久化:消息隊列通常會將消息持久化到磁盤,確保消息不會因系統(tǒng)故障而丟失。1.3Pulsar簡介ApachePulsar是一個高性能、可擴展的分布式消息隊列系統(tǒng),由Yahoo開發(fā)并開源,現(xiàn)已成為Apache軟件基金會的頂級項目。Pulsar提供了消息隊列和發(fā)布/訂閱兩種消息模式,支持多種消息類型,包括二進制、JSON、Avro等。Pulsar的核心特性包括:持久化和非持久化消息:消息可以被持久化到磁盤,也可以選擇在內(nèi)存中短暫存儲。多租戶和多層安全性:支持多租戶環(huán)境,提供細粒度的訪問控制和身份驗證。水平擴展:可以輕松地通過增加更多的Broker來擴展系統(tǒng)的吞吐量和存儲能力。全球分布:支持跨數(shù)據(jù)中心的全球分布,確保數(shù)據(jù)的高可用性和低延遲訪問。1.3.1Pulsar消息模型Pulsar的消息模型基于主題(Topic)和訂閱(Subscription)。一個主題可以有多個生產(chǎn)者(Producer)和消費者(Consumer),生產(chǎn)者向主題發(fā)送消息,消費者從主題中消費消息。訂閱者可以以獨占、共享或鍵共享模式訂閱主題,這決定了消息如何在消費者之間分發(fā)。1.3.2Pulsar消息類型Pulsar支持多種消息類型,包括:二進制消息:最基礎(chǔ)的消息類型,可以是任何二進制數(shù)據(jù)。JSON消息:用于傳輸結(jié)構(gòu)化數(shù)據(jù),便于解析和處理。Avro消息:提供數(shù)據(jù)序列化和反序列化,支持模式演進。示例:使用Java發(fā)送JSON消息到Pulsarimportorg.apache.pulsar.client.api.Schema;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPulsarProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建生產(chǎn)者,使用JSON模式

Producer<String>producer=client.newProducer(Schema.JSON(String.class))

.topic("my-topic")

.create();

//發(fā)送消息

for(inti=0;i<10;i++){

Stringmessage="HelloPulsar"+i;

producer.send(message);

}

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}在上述示例中,我們首先創(chuàng)建了一個Pulsar客戶端,然后使用Schema.JSON(String.class)指定了消息的類型為JSON。接著,我們創(chuàng)建了一個生產(chǎn)者并發(fā)送了10條消息到主題my-topic。最后,我們關(guān)閉了生產(chǎn)者和客戶端,確保資源被正確釋放。示例:使用Java從Pulsar消費JSON消息importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.SubscriptionType;

publicclassPulsarConsumer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建消費者,使用JSON模式

Consumer<String>consumer=client.newConsumer(Schema.JSON(String.class))

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//消費消息

while(true){

Message<String>msg=consumer.receive();

System.out.println("Receivedmessage:"+msg.getValue());

//確認消息已被消費

consumer.acknowledge(msg);

}

}

}在這個消費示例中,我們創(chuàng)建了一個消費者,同樣使用了Schema.JSON(String.class)來指定消息類型。我們訂閱了主題my-topic,并使用了獨占訂閱模式(SubscriptionType.Exclusive)。然后,我們進入一個無限循環(huán),接收并打印消息,最后確認消息已被消費。通過這些示例,我們可以看到Pulsar如何處理不同類型的異步消息,以及如何在生產(chǎn)者和消費者之間建立通信。Pulsar的靈活性和強大的功能使其成為構(gòu)建現(xiàn)代分布式系統(tǒng)時的首選消息隊列系統(tǒng)。2消息隊列:Pulsar:深入理解Pulsar消息模型2.1消息的結(jié)構(gòu)在ApachePulsar中,消息的結(jié)構(gòu)設(shè)計得非常靈活和高效,以適應(yīng)不同的應(yīng)用場景。一個Pulsar消息主要由以下幾部分組成:Payload:消息的實際內(nèi)容,可以是任何類型的數(shù)據(jù),如JSON、XML或二進制數(shù)據(jù)。Properties:一組鍵值對,用于存儲消息的元數(shù)據(jù),如消息的創(chuàng)建時間、內(nèi)容類型等。MessageID:每個消息都有一個唯一的ID,用于消息的追蹤和管理。Schema:定義了消息的結(jié)構(gòu),使得消息的發(fā)送者和接收者能夠理解消息的內(nèi)容。2.1.1示例代碼importorg.apache.pulsar.client.api.Schema;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassMessageStructureExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.create();

StringmessageContent="{\"name\":\"John\",\"age\":30}";

Message<String>message=Message.builder()

.value(messageContent)

.properties(Map.of("type","user","timestamp",String.valueOf(System.currentTimeMillis())))

.build();

producer.send(message);

producer.close();

client.close();

}

}在上述代碼中,我們創(chuàng)建了一個Producer對象,使用Schema.STRING定義了消息的結(jié)構(gòu)。然后,我們構(gòu)建了一個消息,其中包含了實際的payload(messageContent)和一些properties(如類型和時間戳)。2.2消息的生命周期Pulsar消息的生命周期從消息被生產(chǎn)者發(fā)送開始,直到被消費者消費并確認為止。在這個過程中,消息可能會經(jīng)歷以下狀態(tài):待發(fā)送:消息在生產(chǎn)者緩存中等待發(fā)送。發(fā)送中:消息正在通過網(wǎng)絡(luò)發(fā)送到PulsarBroker。已發(fā)送:消息成功發(fā)送到Broker,存儲在消息日志中。待消費:消息存儲在Broker中,等待被消費者消費。消費中:消費者正在處理消息。已消費:消費者成功處理消息,并向Broker發(fā)送確認。2.2.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.MessageId;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassMessageLifecycleExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String>consumer=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscribe();

Message<String>message=consumer.receive();

System.out.println("Receivedmessage:"+message.getValue());

//消費者處理消息

//...

//確認消息已被消費

consumer.acknowledge(message.getMessageId());

consumer.close();

client.close();

}

}在本例中,我們創(chuàng)建了一個Consumer對象,訂閱了特定的主題。當(dāng)消息到達時,consumer.receive()方法將消息從Broker中取出,進入“消費中”狀態(tài)。一旦消息被處理,我們通過調(diào)用consumer.acknowledge()方法來確認消息已被消費,從而完成其生命周期。2.3消息的發(fā)布與訂閱機制Pulsar提供了多種發(fā)布與訂閱機制,以滿足不同的需求。主要的發(fā)布與訂閱模式包括:獨占訂閱(Exclusive):只有一個消費者可以訂閱一個主題,所有消息都只能被這一個消費者消費。共享訂閱(Shared):多個消費者可以訂閱一個主題,消息會被分發(fā)給所有訂閱者中的一個,確保每個消息只被消費一次。故障轉(zhuǎn)移訂閱(Failover):類似于獨占訂閱,但是當(dāng)當(dāng)前的消費者不可用時,消息會被轉(zhuǎn)發(fā)給下一個消費者。2.3.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPublishSubscribeExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String>consumer1=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

Consumer<String>consumer2=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//嘗試接收消息,由于是獨占訂閱,consumer2將無法接收消息

Message<String>message1=consumer1.receive();

Message<String>message2=consumer2.receive();//這里consumer2將阻塞,直到consumer1確認或放棄消息

//消費者處理消息

//...

//確認消息已被消費

consumer1.acknowledge(message1.getMessageId());

consumer1.close();

consumer2.close();

client.close();

}

}在上述代碼中,我們創(chuàng)建了兩個消費者consumer1和consumer2,并使用SubscriptionType.Exclusive指定了獨占訂閱模式。這意味著所有發(fā)送到主題的消息將只被consumer1或consumer2中的一個消費,而不會同時被兩個消費者接收。通過這些示例,我們可以看到Pulsar消息模型的靈活性和強大功能,以及如何通過代碼實現(xiàn)消息的結(jié)構(gòu)定義、生命周期管理和發(fā)布訂閱機制。這為構(gòu)建高效、可靠的消息處理系統(tǒng)提供了堅實的基礎(chǔ)。3Pulsar消息類型詳解在ApachePulsar消息隊列中,消息可以采用多種格式進行編碼和傳輸,以適應(yīng)不同的應(yīng)用場景和數(shù)據(jù)處理需求。本教程將深入探討Pulsar支持的四種主要消息類型:二進制消息、JSON消息、Avro消息以及其他消息格式。3.1進制消息二進制消息是最基本的消息類型,它允許消息以原始字節(jié)流的形式發(fā)送。這種類型的消息適用于任何可以序列化為字節(jié)流的數(shù)據(jù),包括但不限于自定義的二進制數(shù)據(jù)、圖像、音頻文件等。3.1.1示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassBinaryProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<byte[]>producer=client.newProducer().topic("binary-topic").create();

//創(chuàng)建二進制消息

byte[]binaryData=newbyte[]{0x01,0x02,0x03,0x04};

producer.send(binaryData);

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}3.1.2解釋在上述示例中,我們創(chuàng)建了一個Pulsar客戶端,并使用它來創(chuàng)建一個生產(chǎn)者,該生產(chǎn)者將二進制數(shù)據(jù)發(fā)送到名為binary-topic的主題中。二進制數(shù)據(jù)直接以字節(jié)數(shù)組的形式發(fā)送,無需額外的編碼或解碼步驟。3.2JSON消息JSON(JavaScriptObjectNotation)是一種輕量級的數(shù)據(jù)交換格式,易于人閱讀和編寫,同時也易于機器解析和生成。Pulsar支持JSON消息,這使得在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)變得簡單。3.2.1示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importcom.fasterxml.jackson.databind.ObjectMapper;

publicclassJSONProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer().topic("json-topic").create();

//創(chuàng)建JSON消息

ObjectMappermapper=newObjectMapper();

Useruser=newUser("JohnDoe",30);

StringjsonMessage=mapper.writeValueAsString(user);

producer.send(jsonMessage.getBytes());

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

staticclassUser{

Stringname;

intage;

publicUser(Stringname,intage){

=name;

this.age=age;

}

}

}3.2.2解釋此示例展示了如何使用Java的ObjectMapper類將一個簡單的User對象轉(zhuǎn)換為JSON字符串,然后將其發(fā)送到Pulsar的主題json-topic中。User對象包含姓名和年齡兩個字段,通過序列化為JSON格式,可以方便地在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)。3.3Avro消息ApacheAvro是一種數(shù)據(jù)序列化系統(tǒng),它不僅提供緊湊、快速的二進制數(shù)據(jù)序列化,還支持模式演進,即在不破壞向后兼容性的情況下,可以修改數(shù)據(jù)模式。Pulsar通過Avro編碼支持高效地傳輸復(fù)雜數(shù)據(jù)結(jié)構(gòu)。3.3.1示例代碼importorg.apache.avro.Schema;

importorg.apache.avro.generic.GenericData;

importorg.apache.avro.generic.GenericRecord;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.impl.schema.AvroSchema;

publicclassAvroProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<GenericRecord>producer=client.newProducer(AvroSchema.of(User.class)).topic("avro-topic").create();

//創(chuàng)建Avro消息

Useruser=newUser();

user.setName("JaneDoe");

user.setAge(25);

producer.send(user);

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

staticclassUserextendsGenericData.Record{

publicUser(){

Schemaschema=newSchema.Parser().parse("{"

+"\"type\":\"record\","

+"\"name\":\"User\","

+"\"fields\":["

+"{\"name\":\"name\",\"type\":\"string\"},"

+"{\"name\":\"age\",\"type\":\"int\"}"

+"]"

+"}");

super(schema);

}

publicvoidsetName(Stringname){

put("name",name);

}

publicvoidsetAge(intage){

put("age",age);

}

}

}3.3.2解釋在AvroProducer示例中,我們首先定義了一個Avro模式,該模式描述了User對象的結(jié)構(gòu),包括name和age字段。然后,我們使用Pulsar的AvroSchema創(chuàng)建一個生產(chǎn)者,將User對象直接發(fā)送到avro-topic主題。Avro的模式演進特性使得在數(shù)據(jù)結(jié)構(gòu)發(fā)生變化時,仍然能夠保證消息的正確傳輸和解析。3.4其他消息格式除了上述提到的二進制、JSON和Avro消息格式,Pulsar還支持其他消息格式,如Protobuf、XML等。這些格式的選擇取決于具體的應(yīng)用場景和數(shù)據(jù)處理需求。3.4.1Protobuf消息Protobuf(ProtocolBuffers)是Google開發(fā)的一種數(shù)據(jù)交換格式,它提供了高效的序列化和反序列化機制,特別適合于傳輸大量結(jié)構(gòu)化數(shù)據(jù)。3.4.2示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importtobuf.GeneratedMessageV3;

publicclassProtobufProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<GeneratedMessageV3>producer=client.newProducer(Schema.PROTOBUF(UserProto.User.getDefaultInstance())).topic("protobuf-topic").create();

//創(chuàng)建Protobuf消息

UserProto.Useruser=UserProto.User.newBuilder()

.setName("Alice")

.setAge(35)

.build();

producer.send(user.toByteArray());

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}3.4.3解釋在ProtobufProducer示例中,我們使用了Google的Protobuf庫來定義User消息的結(jié)構(gòu),并通過Pulsar的ProtobufSchema創(chuàng)建了一個生產(chǎn)者。User對象通過toByteArray方法轉(zhuǎn)換為字節(jié)數(shù)組,然后發(fā)送到protobuf-topic主題。Protobuf的高效性和緊湊性使其成為處理大量結(jié)構(gòu)化數(shù)據(jù)的理想選擇。3.5結(jié)論Pulsar通過支持多種消息格式,如二進制、JSON、Avro和Protobuf,為開發(fā)者提供了靈活的數(shù)據(jù)傳輸和處理選項。選擇合適的消息格式可以顯著提高消息處理的效率和可靠性,同時簡化數(shù)據(jù)的序列化和反序列化過程。在實際應(yīng)用中,應(yīng)根據(jù)數(shù)據(jù)的特性和處理需求來選擇最合適的格式。請注意,上述代碼示例假設(shè)你已經(jīng)定義了相應(yīng)的Protobuf消息結(jié)構(gòu)(UserProto),并且在項目中包含了必要的依賴庫。在實際應(yīng)用中,你可能需要根據(jù)自己的數(shù)據(jù)模型和需求進行相應(yīng)的調(diào)整。4消息發(fā)布4.1生產(chǎn)者角色在ApachePulsar消息隊列中,生產(chǎn)者是負責(zé)生成和發(fā)送消息的組件。生產(chǎn)者通過連接到Pulsar的Broker,將消息發(fā)送到特定的Topic中。每個Topic可以有多個生產(chǎn)者,這意味著多個應(yīng)用程序或服務(wù)可以同時向同一個Topic發(fā)送消息,從而實現(xiàn)高并發(fā)的消息發(fā)布能力。4.1.1發(fā)布消息流程創(chuàng)建Producer對象:生產(chǎn)者首先需要創(chuàng)建一個Producer對象,這通常通過PulsarClient的createProducer方法完成。發(fā)送消息:使用Producer對象的send方法將消息發(fā)送到指定的Topic。消息可以是任意類型的數(shù)據(jù),但在發(fā)送前需要被序列化為字節(jié)流。確認消息發(fā)送:Pulsar支持消息發(fā)送的確認機制,確保消息被成功發(fā)送到Broker。如果消息發(fā)送失敗,生產(chǎn)者可以重新發(fā)送消息。關(guān)閉Producer:在完成消息發(fā)送后,生產(chǎn)者應(yīng)該調(diào)用close方法來釋放資源。4.1.2消息持久化策略Pulsar提供了多種消息持久化策略,以確保消息在Broker故障時不會丟失。這些策略包括:消息存儲:Pulsar使用ApacheBookKeeper作為其后端存儲系統(tǒng),所有消息都會被持久化到磁盤上。消息復(fù)制:Pulsar支持消息的跨Broker復(fù)制,以提高系統(tǒng)的可用性和容錯性。復(fù)制策略可以是同步或異步的。消息過期:Pulsar允許設(shè)置消息的過期時間,過期的消息將被自動刪除,以節(jié)省存儲空間。4.2示例代碼:創(chuàng)建生產(chǎn)者并發(fā)送消息importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.ProducerConfiguration;

importorg.apache.pulsar.client.api.Schema;

publicclassPulsarProducerExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建PulsarClient實例

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

//創(chuàng)建Producer配置

ProducerConfiguration<String>producerConfiguration=ProducerConfiguration.builder()

.schema(Schema.STRING)

.build();

//創(chuàng)建Producer

Producer<String>producer=client.newProducer(producerConfiguration)

.topic("persistent://public/default/my-topic")

.create();

//發(fā)送消息

for(inti=0;i<10;i++){

Stringmessage="HelloPulsar"+i;

producer.send(message);

}

//關(guān)閉Producer

producer.close();

//關(guān)閉PulsarClient

client.close();

}

}4.2.1代碼解釋創(chuàng)建PulsarClient:通過PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();創(chuàng)建一個PulsarClient實例,其中serviceUrl指定了Pulsar服務(wù)的URL。創(chuàng)建Producer配置:使用ProducerConfiguration.builder().schema(Schema.STRING).build();創(chuàng)建一個Producer配置對象,指定了消息的Schema為字符串類型。創(chuàng)建Producer:通過client.newProducer(producerConfiguration).topic("persistent://public/default/my-topic").create();創(chuàng)建一個Producer,其中topic指定了消息將被發(fā)送到的Topic。發(fā)送消息:使用producer.send(message);方法發(fā)送消息到指定的Topic。關(guān)閉Producer和PulsarClient:在完成消息發(fā)送后,調(diào)用producer.close();和client.close();來釋放資源。通過以上步驟,我們可以看到Pulsar消息隊列中消息發(fā)布的完整流程,從創(chuàng)建生產(chǎn)者到發(fā)送消息,再到資源的釋放,每一步都遵循了Pulsar的API規(guī)范,確保了消息的正確發(fā)送和系統(tǒng)的穩(wěn)定運行。5消息消費5.1消費者角色在ApachePulsar消息隊列中,消費者是消息的接收者。它們訂閱主題,接收并處理由生產(chǎn)者發(fā)送的消息。消費者可以是任何能夠接收和處理消息的應(yīng)用程序或服務(wù)。在Pulsar中,消費者通過創(chuàng)建一個Consumer對象來訂閱主題,這個對象提供了接收消息的方法。5.1.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPulsarConsumerExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建消費者,訂閱主題

Consumer<String>consumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscribe();

//接收消息

Message<String>msg=consumer.receive();

//處理消息

System.out.println("Receivedmessage:"+msg.getValue());

//確認消息已處理

consumer.acknowledge(msg);

//關(guān)閉消費者和客戶端

consumer.close();

client.close();

}

}5.2消息消費模式Pulsar支持兩種主要的消息消費模式:獨占(Exclusive)和共享(Shared)。5.2.1獨占消費在獨占消費模式下,一個訂閱只能被一個消費者使用。這意味著如果多個消費者嘗試訂閱同一個主題和訂閱名稱,只有第一個消費者能夠成功訂閱,其他消費者將收到錯誤。這種模式適用于需要確保消息只被一個消費者處理的情況。5.2.2共享消費共享消費模式允許多個消費者訂閱同一個主題和訂閱名稱。消息將被分發(fā)給訂閱中的任意一個消費者,確保消息至少被處理一次。這種模式適用于需要高可用性和負載均衡的場景。5.2.3示例代碼//獨占消費模式

Consumer<String>exclusiveConsumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//共享消費模式

Consumer<String>sharedConsumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();5.3消息重試與死信隊列在處理消息時,如果消費者未能成功處理消息,Pulsar提供了消息重試機制。消息可以被重新發(fā)送給消費者,直到成功處理。如果消息在多次重試后仍然無法處理,它可以被移動到死信隊列(DeadLetterQueue)。5.3.1消息重試消費者可以通過negativeAcknowledge方法來標(biāo)記消息為未處理,這將觸發(fā)消息重試。Pulsar允許配置重試策略,包括重試次數(shù)和重試間隔。5.3.2死信隊列死信隊列用于存儲那些無法被正常處理的消息。這些消息可以被單獨處理或分析,以找出處理失敗的原因。在Pulsar中,可以通過配置主題的策略來啟用死信隊列。5.3.3示例代碼//消費者處理消息失敗,請求重試

consumer.negativeAcknowledge(msg);

//配置主題策略,啟用死信隊列

TopicPoliciespolicies=newTopicPolicies();

policies.setDeadLetterTopic("persistent://public/default/my-dead-letter-topic");

client.getAdmin().topics().updateTopicPolicies("persistent://public/default/my-topic",policies);通過上述代碼和解釋,我們深入了解了Pulsar中消費者角色的定義、消息消費的不同模式,以及如何處理消息重試和死信隊列。這些機制確保了消息的可靠處理和系統(tǒng)的高可用性。6消息路由策略6.1消息分發(fā)原理在消息隊列系統(tǒng)中,如ApachePulsar,消息的分發(fā)是一個關(guān)鍵過程,它決定了消息如何從生產(chǎn)者到達消費者。Pulsar通過其獨特的消息模型和靈活的路由策略,確保了消息的高效、有序和可靠傳輸。消息分發(fā)原理基于以下幾點:主題(Topic):在Pulsar中,消息被發(fā)布到特定的主題上,這些主題可以是持久的或非持久的,根據(jù)消息的存儲需求而定。分區(qū)(Partition):為了提高吞吐量和可擴展性,主題可以被劃分為多個分區(qū),每個分區(qū)獨立存儲和處理消息。消費者(Consumer):消費者訂閱主題以接收消息。Pulsar支持獨占、共享和鍵共享三種訂閱類型,以滿足不同的消費模式。消息路由:生產(chǎn)者發(fā)送消息時,Pulsar根據(jù)配置的路由策略決定將消息發(fā)送到哪個分區(qū)。這確保了消息的均衡分布和處理。6.2路由策略類型Pulsar提供了多種消息路由策略,以適應(yīng)不同的應(yīng)用場景和需求:RoundRobin:輪詢策略,將消息均勻地分發(fā)到所有分區(qū),以實現(xiàn)負載均衡。KeyBased:基于消息鍵的策略,根據(jù)消息中的鍵將消息路由到特定的分區(qū),確保具有相同鍵的消息被發(fā)送到同一分區(qū),便于實現(xiàn)消息的有序處理。Custom:自定義策略,允許用戶實現(xiàn)自己的消息路由邏輯,提供最大的靈活性。6.2.1RoundRobin路由策略原理RoundRobin策略是最簡單的消息分發(fā)方式,它將消息輪流發(fā)送到不同的分區(qū),確保每個分區(qū)的負載大致相同。這種策略適用于消息大小和處理時間相對均勻的場景。示例假設(shè)我們有一個主題my-topic,它被劃分為4個分區(qū)。使用RoundRobin策略,消息將按照以下順序被發(fā)送:第1條消息發(fā)送到分區(qū)0第2條消息發(fā)送到分區(qū)1第3條消息發(fā)送到分區(qū)2第4條消息發(fā)送到分區(qū)3第5條消息再次發(fā)送到分區(qū)0,以此類推。6.2.2KeyBased路由策略原理KeyBased策略根據(jù)消息中的鍵(key)將消息路由到特定的分區(qū)。如果消息包含相同的鍵,它們將被發(fā)送到同一分區(qū),這有助于實現(xiàn)消息的有序處理和聚合。鍵可以是消息中的任意字段,也可以是自定義的鍵生成邏輯。示例假設(shè)我們有一個主題my-topic,它被劃分為3個分區(qū)。消息包含一個用戶ID作為鍵,ID為1的消息將始終被發(fā)送到分區(qū)0,ID為2的消息將被發(fā)送到分區(qū)1,ID為3的消息將被發(fā)送到分區(qū)2,以此類推。#Python示例代碼

importpulsar

client=pulsar.Client('pulsar://localhost:6650')

producer=client.create_producer('my-topic',key_type='string')

#發(fā)送消息,鍵為用戶ID

foriinrange(10):

key=str(i%3)#生成鍵,確保消息被均勻分布到3個分區(qū)

producer.send(('message-'+str(i)).encode('utf-8'),key=key)

client.close()6.2.3Custom路由策略原理Custom策略允許用戶實現(xiàn)自己的消息路由邏輯。這提供了最大的靈活性,但同時也要求用戶對消息隊列的性能和可靠性有深入的理解。自定義策略可以通過實現(xiàn)Pulsar的MessageRouter接口來實現(xiàn)。示例假設(shè)我們想要根據(jù)消息的類型(例如,交易、日志、警報)將消息路由到不同的分區(qū),可以實現(xiàn)一個自定義的路由策略。//Java示例代碼

importorg.apache.pulsar.client.api.MessageRouter;

importorg.apache.pulsar.client.api.MessageRouterBuilder;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

publicclassCustomMessageRouterimplementsMessageRouter<String>{

@Override

publicintroute(Stringkey,intnumPartitions){

if(key.startsWith("transaction")){

return0;//交易消息發(fā)送到分區(qū)0

}elseif(key.startsWith("log")){

return1;//日志消息發(fā)送到分區(qū)1

}elseif(key.startsWith("alert")){

return2;//警報消息發(fā)送到分區(qū)2

}

returnnumPartitions/2;//默認情況下,將消息發(fā)送到中間分區(qū)

}

}

publicclassCustomProducer{

publicstaticvoidmain(String[]args)throwsException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.SinglePartition)

.messageRouter(routerBuilder)

.create();

//發(fā)送不同類型的消息

producer.send("transaction-1","transaction");

producer.send("log-2","log");

producer.send("alert-3","alert");

client.close();

}

}6.3策略配置示例在Pulsar中,配置路由策略可以通過修改生產(chǎn)者創(chuàng)建時的參數(shù)來實現(xiàn)。以下是一個使用RoundRobin策略的配置示例://Java示例代碼

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.Producer;

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)

.create();在這個示例中,messageRoutingMode參數(shù)被設(shè)置為RoundRobinPartition,這告訴Pulsar使用輪詢策略來分發(fā)消息。對于KeyBased策略,可以通過在發(fā)送消息時指定鍵來實現(xiàn):producer.send(("message-"+i).getBytes(),"key-"+(i%3));在這個示例中,消息的鍵被設(shè)置為"key-"+(i%3),這確保了具有相同鍵的消息被發(fā)送到同一分區(qū)。對于Custom策略,需要實現(xiàn)MessageRouter接口,并在創(chuàng)建生產(chǎn)者時指定自定義的路由策略。MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.SinglePartition)

.messageRouter(routerBuilder)

.create();在這個示例中,CustomMessageRouter類實現(xiàn)了自定義的路由邏輯,通過MessageRouterBuilder和messageRouter參數(shù),可以將這個自定義策略應(yīng)用到生產(chǎn)者上。通過以上示例,我們可以看到Pulsar提供了豐富的消息路由策略,以滿足不同場景的需求。選擇合適的策略對于優(yōu)化消息隊列的性能和可靠性至關(guān)重要。7高級消息處理7.1消息壓縮在消息隊列系統(tǒng)中,如ApachePulsar,消息壓縮是一種優(yōu)化網(wǎng)絡(luò)傳輸和存儲空間的有效手段。Pulsar支持多種壓縮算法,包括LZ4、ZLIB、ZSTD等,以適應(yīng)不同的性能和壓縮比需求。7.1.1原理消息壓縮在消息發(fā)送前進行,將原始消息數(shù)據(jù)轉(zhuǎn)換為更小的二進制格式,從而減少在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量和存儲空間的占用。接收方在消費消息時,會自動解壓縮消息,恢復(fù)原始數(shù)據(jù)。7.1.2示例在Pulsar中,可以通過設(shè)置producer的compressionType屬性來啟用消息壓縮。以下是一個使用LZ4壓縮算法的示例:importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassCompressedProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Producer<byte[]>producer=client.newProducer()

.topic("persistent://sample/standalone/ns/my-topic")

.compressionType(CompressionType.LZ4)

.create();

Stringmessage="Hello,Pulsar!";

producer.send(message.getBytes());

producer.close();

client.close();

}

}7.1.3解釋在上述代碼中,我們創(chuàng)建了一個PulsarClient實例,并指定了服務(wù)URL。然后,我們創(chuàng)建了一個Producer,設(shè)置了topic和compressionType為LZ4。發(fā)送消息時,原始字符串被轉(zhuǎn)換為字節(jié)數(shù)組,然后通過壓縮的Producer發(fā)送。接收方會自動解壓縮消息,無需額外的解壓縮邏輯。7.2消息時間戳在Pulsar中,每條消息都有一個時間戳,用于記錄消息的創(chuàng)建時間或發(fā)送時間。時間戳可以用于實現(xiàn)時間窗口、消息延遲發(fā)送等功能。7.2.1原理Pulsar允許在消息發(fā)送時指定時間戳,如果沒有指定,系統(tǒng)會自動使用消息發(fā)送時的時間作為時間戳。時間戳以毫秒為單位,存儲在消息的元數(shù)據(jù)中。7.2.2示例以下是一個在發(fā)送消息時指定時間戳的示例:importorg.apache.pulsar.client.api.MessageBuilder;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassTimestampProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Producer<byte[]>producer=client.newProducer()

.topic("persistent://sample/standalone/ns/my-topic")

.create();

longtimestamp=System.currentTimeMillis()+10000;//發(fā)送10秒后的時間

Stringmessage="Hello,Pulsar!";

producer.newMessage()

.value(message.getBytes())

.timestamp(timestamp)

.send();

producer.close();

client.close();

}

}7.2.3解釋在本例中,我們創(chuàng)建了一個Producer,并通過newMessage()方法創(chuàng)建了一個消息構(gòu)建器。我們指定了一個未來的時間戳,使得消息在10秒后才被標(biāo)記為發(fā)送。這在實現(xiàn)延遲消息處理時非常有用。7.3消息順序保證在分布式系統(tǒng)中,消息的順序處理是一個常見的需求。Pulsar提供了消息順序保證的機制,確保消息按照發(fā)送的順序被消費。7.3.1原理Pulsar通過在Producer和Consumer上設(shè)置相應(yīng)的屬性來實現(xiàn)消息順序。在Producer端,可以設(shè)置blockIfQueueFull屬性為true,以確保消息在隊列滿時被阻塞,直到隊列有空間。在Consumer端,可以設(shè)置subscriptionType為Exclusive,以確保一個Consumer獨占一個topic,從而保證消息順序。7.3.2示例以下是一個使用Pulsar保證消息順序的示例:importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.SubscriptionType;

importjava.util.concurrent.TimeUnit;

publicclassOrderedConsumer{

publicstaticvoidmain(String[]args)throwsPulsarClientException,InterruptedException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Consumer<byte[]>consumer=client.newConsumer()

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

while(true){

Message<byte[]>msg=consumer.receive(5,TimeUnit.SECONDS);

if(msg!=null){

System.out.println("Receivedmessage:

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論