版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
消息隊列:Pulsar:Pulsar消息模型與消息類型1消息隊列基礎(chǔ)1.1消息隊列的定義消息隊列是一種應(yīng)用程序間通信(IPC)的模式,它允許消息在發(fā)送者和接收者之間異步傳遞。消息隊列中的消息遵循先進先出(FIFO)原則,但同時也支持更復(fù)雜的路由策略。消息隊列可以提高系統(tǒng)的解耦性、可擴展性和容錯能力。1.2消息隊列的作用消息隊列在現(xiàn)代分布式系統(tǒng)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許發(fā)送者和接收者異步操作,提高系統(tǒng)響應(yīng)速度。負載均衡:通過消息隊列,可以將任務(wù)均勻地分配給多個處理者。削峰填谷:在高負載時,消息隊列可以緩存消息,避免系統(tǒng)過載。系統(tǒng)解耦:發(fā)送者和接收者不需要直接通信,降低了系統(tǒng)的耦合度。數(shù)據(jù)持久化:消息隊列通常會將消息持久化到磁盤,確保消息不會因系統(tǒng)故障而丟失。1.3Pulsar簡介ApachePulsar是一個高性能、可擴展的分布式消息隊列系統(tǒng),由Yahoo開發(fā)并開源,現(xiàn)已成為Apache軟件基金會的頂級項目。Pulsar提供了消息隊列和發(fā)布/訂閱兩種消息模式,支持多種消息類型,包括二進制、JSON、Avro等。Pulsar的核心特性包括:持久化和非持久化消息:消息可以被持久化到磁盤,也可以選擇在內(nèi)存中短暫存儲。多租戶和多層安全性:支持多租戶環(huán)境,提供細粒度的訪問控制和身份驗證。水平擴展:可以輕松地通過增加更多的Broker來擴展系統(tǒng)的吞吐量和存儲能力。全球分布:支持跨數(shù)據(jù)中心的全球分布,確保數(shù)據(jù)的高可用性和低延遲訪問。1.3.1Pulsar消息模型Pulsar的消息模型基于主題(Topic)和訂閱(Subscription)。一個主題可以有多個生產(chǎn)者(Producer)和消費者(Consumer),生產(chǎn)者向主題發(fā)送消息,消費者從主題中消費消息。訂閱者可以以獨占、共享或鍵共享模式訂閱主題,這決定了消息如何在消費者之間分發(fā)。1.3.2Pulsar消息類型Pulsar支持多種消息類型,包括:二進制消息:最基礎(chǔ)的消息類型,可以是任何二進制數(shù)據(jù)。JSON消息:用于傳輸結(jié)構(gòu)化數(shù)據(jù),便于解析和處理。Avro消息:提供數(shù)據(jù)序列化和反序列化,支持模式演進。示例:使用Java發(fā)送JSON消息到Pulsarimportorg.apache.pulsar.client.api.Schema;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建生產(chǎn)者,使用JSON模式
Producer<String>producer=client.newProducer(Schema.JSON(String.class))
.topic("my-topic")
.create();
//發(fā)送消息
for(inti=0;i<10;i++){
Stringmessage="HelloPulsar"+i;
producer.send(message);
}
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}在上述示例中,我們首先創(chuàng)建了一個Pulsar客戶端,然后使用Schema.JSON(String.class)指定了消息的類型為JSON。接著,我們創(chuàng)建了一個生產(chǎn)者并發(fā)送了10條消息到主題my-topic。最后,我們關(guān)閉了生產(chǎn)者和客戶端,確保資源被正確釋放。示例:使用Java從Pulsar消費JSON消息importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.SubscriptionType;
publicclassPulsarConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費者,使用JSON模式
Consumer<String>consumer=client.newConsumer(Schema.JSON(String.class))
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費消息
while(true){
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認消息已被消費
consumer.acknowledge(msg);
}
}
}在這個消費示例中,我們創(chuàng)建了一個消費者,同樣使用了Schema.JSON(String.class)來指定消息類型。我們訂閱了主題my-topic,并使用了獨占訂閱模式(SubscriptionType.Exclusive)。然后,我們進入一個無限循環(huán),接收并打印消息,最后確認消息已被消費。通過這些示例,我們可以看到Pulsar如何處理不同類型的異步消息,以及如何在生產(chǎn)者和消費者之間建立通信。Pulsar的靈活性和強大的功能使其成為構(gòu)建現(xiàn)代分布式系統(tǒng)時的首選消息隊列系統(tǒng)。2消息隊列:Pulsar:深入理解Pulsar消息模型2.1消息的結(jié)構(gòu)在ApachePulsar中,消息的結(jié)構(gòu)設(shè)計得非常靈活和高效,以適應(yīng)不同的應(yīng)用場景。一個Pulsar消息主要由以下幾部分組成:Payload:消息的實際內(nèi)容,可以是任何類型的數(shù)據(jù),如JSON、XML或二進制數(shù)據(jù)。Properties:一組鍵值對,用于存儲消息的元數(shù)據(jù),如消息的創(chuàng)建時間、內(nèi)容類型等。MessageID:每個消息都有一個唯一的ID,用于消息的追蹤和管理。Schema:定義了消息的結(jié)構(gòu),使得消息的發(fā)送者和接收者能夠理解消息的內(nèi)容。2.1.1示例代碼importorg.apache.pulsar.client.api.Schema;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassMessageStructureExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.create();
StringmessageContent="{\"name\":\"John\",\"age\":30}";
Message<String>message=Message.builder()
.value(messageContent)
.properties(Map.of("type","user","timestamp",String.valueOf(System.currentTimeMillis())))
.build();
producer.send(message);
producer.close();
client.close();
}
}在上述代碼中,我們創(chuàng)建了一個Producer對象,使用Schema.STRING定義了消息的結(jié)構(gòu)。然后,我們構(gòu)建了一個消息,其中包含了實際的payload(messageContent)和一些properties(如類型和時間戳)。2.2消息的生命周期Pulsar消息的生命周期從消息被生產(chǎn)者發(fā)送開始,直到被消費者消費并確認為止。在這個過程中,消息可能會經(jīng)歷以下狀態(tài):待發(fā)送:消息在生產(chǎn)者緩存中等待發(fā)送。發(fā)送中:消息正在通過網(wǎng)絡(luò)發(fā)送到PulsarBroker。已發(fā)送:消息成功發(fā)送到Broker,存儲在消息日志中。待消費:消息存儲在Broker中,等待被消費者消費。消費中:消費者正在處理消息。已消費:消費者成功處理消息,并向Broker發(fā)送確認。2.2.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.MessageId;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassMessageLifecycleExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscribe();
Message<String>message=consumer.receive();
System.out.println("Receivedmessage:"+message.getValue());
//消費者處理消息
//...
//確認消息已被消費
consumer.acknowledge(message.getMessageId());
consumer.close();
client.close();
}
}在本例中,我們創(chuàng)建了一個Consumer對象,訂閱了特定的主題。當(dāng)消息到達時,consumer.receive()方法將消息從Broker中取出,進入“消費中”狀態(tài)。一旦消息被處理,我們通過調(diào)用consumer.acknowledge()方法來確認消息已被消費,從而完成其生命周期。2.3消息的發(fā)布與訂閱機制Pulsar提供了多種發(fā)布與訂閱機制,以滿足不同的需求。主要的發(fā)布與訂閱模式包括:獨占訂閱(Exclusive):只有一個消費者可以訂閱一個主題,所有消息都只能被這一個消費者消費。共享訂閱(Shared):多個消費者可以訂閱一個主題,消息會被分發(fā)給所有訂閱者中的一個,確保每個消息只被消費一次。故障轉(zhuǎn)移訂閱(Failover):類似于獨占訂閱,但是當(dāng)當(dāng)前的消費者不可用時,消息會被轉(zhuǎn)發(fā)給下一個消費者。2.3.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPublishSubscribeExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//嘗試接收消息,由于是獨占訂閱,consumer2將無法接收消息
Message<String>message1=consumer1.receive();
Message<String>message2=consumer2.receive();//這里consumer2將阻塞,直到consumer1確認或放棄消息
//消費者處理消息
//...
//確認消息已被消費
consumer1.acknowledge(message1.getMessageId());
consumer1.close();
consumer2.close();
client.close();
}
}在上述代碼中,我們創(chuàng)建了兩個消費者consumer1和consumer2,并使用SubscriptionType.Exclusive指定了獨占訂閱模式。這意味著所有發(fā)送到主題的消息將只被consumer1或consumer2中的一個消費,而不會同時被兩個消費者接收。通過這些示例,我們可以看到Pulsar消息模型的靈活性和強大功能,以及如何通過代碼實現(xiàn)消息的結(jié)構(gòu)定義、生命周期管理和發(fā)布訂閱機制。這為構(gòu)建高效、可靠的消息處理系統(tǒng)提供了堅實的基礎(chǔ)。3Pulsar消息類型詳解在ApachePulsar消息隊列中,消息可以采用多種格式進行編碼和傳輸,以適應(yīng)不同的應(yīng)用場景和數(shù)據(jù)處理需求。本教程將深入探討Pulsar支持的四種主要消息類型:二進制消息、JSON消息、Avro消息以及其他消息格式。3.1進制消息二進制消息是最基本的消息類型,它允許消息以原始字節(jié)流的形式發(fā)送。這種類型的消息適用于任何可以序列化為字節(jié)流的數(shù)據(jù),包括但不限于自定義的二進制數(shù)據(jù)、圖像、音頻文件等。3.1.1示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassBinaryProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]>producer=client.newProducer().topic("binary-topic").create();
//創(chuàng)建二進制消息
byte[]binaryData=newbyte[]{0x01,0x02,0x03,0x04};
producer.send(binaryData);
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}3.1.2解釋在上述示例中,我們創(chuàng)建了一個Pulsar客戶端,并使用它來創(chuàng)建一個生產(chǎn)者,該生產(chǎn)者將二進制數(shù)據(jù)發(fā)送到名為binary-topic的主題中。二進制數(shù)據(jù)直接以字節(jié)數(shù)組的形式發(fā)送,無需額外的編碼或解碼步驟。3.2JSON消息JSON(JavaScriptObjectNotation)是一種輕量級的數(shù)據(jù)交換格式,易于人閱讀和編寫,同時也易于機器解析和生成。Pulsar支持JSON消息,這使得在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)變得簡單。3.2.1示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importcom.fasterxml.jackson.databind.ObjectMapper;
publicclassJSONProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer().topic("json-topic").create();
//創(chuàng)建JSON消息
ObjectMappermapper=newObjectMapper();
Useruser=newUser("JohnDoe",30);
StringjsonMessage=mapper.writeValueAsString(user);
producer.send(jsonMessage.getBytes());
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
staticclassUser{
Stringname;
intage;
publicUser(Stringname,intage){
=name;
this.age=age;
}
}
}3.2.2解釋此示例展示了如何使用Java的ObjectMapper類將一個簡單的User對象轉(zhuǎn)換為JSON字符串,然后將其發(fā)送到Pulsar的主題json-topic中。User對象包含姓名和年齡兩個字段,通過序列化為JSON格式,可以方便地在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)。3.3Avro消息ApacheAvro是一種數(shù)據(jù)序列化系統(tǒng),它不僅提供緊湊、快速的二進制數(shù)據(jù)序列化,還支持模式演進,即在不破壞向后兼容性的情況下,可以修改數(shù)據(jù)模式。Pulsar通過Avro編碼支持高效地傳輸復(fù)雜數(shù)據(jù)結(jié)構(gòu)。3.3.1示例代碼importorg.apache.avro.Schema;
importorg.apache.avro.generic.GenericData;
importorg.apache.avro.generic.GenericRecord;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.impl.schema.AvroSchema;
publicclassAvroProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<GenericRecord>producer=client.newProducer(AvroSchema.of(User.class)).topic("avro-topic").create();
//創(chuàng)建Avro消息
Useruser=newUser();
user.setName("JaneDoe");
user.setAge(25);
producer.send(user);
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
staticclassUserextendsGenericData.Record{
publicUser(){
Schemaschema=newSchema.Parser().parse("{"
+"\"type\":\"record\","
+"\"name\":\"User\","
+"\"fields\":["
+"{\"name\":\"name\",\"type\":\"string\"},"
+"{\"name\":\"age\",\"type\":\"int\"}"
+"]"
+"}");
super(schema);
}
publicvoidsetName(Stringname){
put("name",name);
}
publicvoidsetAge(intage){
put("age",age);
}
}
}3.3.2解釋在AvroProducer示例中,我們首先定義了一個Avro模式,該模式描述了User對象的結(jié)構(gòu),包括name和age字段。然后,我們使用Pulsar的AvroSchema創(chuàng)建一個生產(chǎn)者,將User對象直接發(fā)送到avro-topic主題。Avro的模式演進特性使得在數(shù)據(jù)結(jié)構(gòu)發(fā)生變化時,仍然能夠保證消息的正確傳輸和解析。3.4其他消息格式除了上述提到的二進制、JSON和Avro消息格式,Pulsar還支持其他消息格式,如Protobuf、XML等。這些格式的選擇取決于具體的應(yīng)用場景和數(shù)據(jù)處理需求。3.4.1Protobuf消息Protobuf(ProtocolBuffers)是Google開發(fā)的一種數(shù)據(jù)交換格式,它提供了高效的序列化和反序列化機制,特別適合于傳輸大量結(jié)構(gòu)化數(shù)據(jù)。3.4.2示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importtobuf.GeneratedMessageV3;
publicclassProtobufProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<GeneratedMessageV3>producer=client.newProducer(Schema.PROTOBUF(UserProto.User.getDefaultInstance())).topic("protobuf-topic").create();
//創(chuàng)建Protobuf消息
UserProto.Useruser=UserProto.User.newBuilder()
.setName("Alice")
.setAge(35)
.build();
producer.send(user.toByteArray());
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}3.4.3解釋在ProtobufProducer示例中,我們使用了Google的Protobuf庫來定義User消息的結(jié)構(gòu),并通過Pulsar的ProtobufSchema創(chuàng)建了一個生產(chǎn)者。User對象通過toByteArray方法轉(zhuǎn)換為字節(jié)數(shù)組,然后發(fā)送到protobuf-topic主題。Protobuf的高效性和緊湊性使其成為處理大量結(jié)構(gòu)化數(shù)據(jù)的理想選擇。3.5結(jié)論Pulsar通過支持多種消息格式,如二進制、JSON、Avro和Protobuf,為開發(fā)者提供了靈活的數(shù)據(jù)傳輸和處理選項。選擇合適的消息格式可以顯著提高消息處理的效率和可靠性,同時簡化數(shù)據(jù)的序列化和反序列化過程。在實際應(yīng)用中,應(yīng)根據(jù)數(shù)據(jù)的特性和處理需求來選擇最合適的格式。請注意,上述代碼示例假設(shè)你已經(jīng)定義了相應(yīng)的Protobuf消息結(jié)構(gòu)(UserProto),并且在項目中包含了必要的依賴庫。在實際應(yīng)用中,你可能需要根據(jù)自己的數(shù)據(jù)模型和需求進行相應(yīng)的調(diào)整。4消息發(fā)布4.1生產(chǎn)者角色在ApachePulsar消息隊列中,生產(chǎn)者是負責(zé)生成和發(fā)送消息的組件。生產(chǎn)者通過連接到Pulsar的Broker,將消息發(fā)送到特定的Topic中。每個Topic可以有多個生產(chǎn)者,這意味著多個應(yīng)用程序或服務(wù)可以同時向同一個Topic發(fā)送消息,從而實現(xiàn)高并發(fā)的消息發(fā)布能力。4.1.1發(fā)布消息流程創(chuàng)建Producer對象:生產(chǎn)者首先需要創(chuàng)建一個Producer對象,這通常通過PulsarClient的createProducer方法完成。發(fā)送消息:使用Producer對象的send方法將消息發(fā)送到指定的Topic。消息可以是任意類型的數(shù)據(jù),但在發(fā)送前需要被序列化為字節(jié)流。確認消息發(fā)送:Pulsar支持消息發(fā)送的確認機制,確保消息被成功發(fā)送到Broker。如果消息發(fā)送失敗,生產(chǎn)者可以重新發(fā)送消息。關(guān)閉Producer:在完成消息發(fā)送后,生產(chǎn)者應(yīng)該調(diào)用close方法來釋放資源。4.1.2消息持久化策略Pulsar提供了多種消息持久化策略,以確保消息在Broker故障時不會丟失。這些策略包括:消息存儲:Pulsar使用ApacheBookKeeper作為其后端存儲系統(tǒng),所有消息都會被持久化到磁盤上。消息復(fù)制:Pulsar支持消息的跨Broker復(fù)制,以提高系統(tǒng)的可用性和容錯性。復(fù)制策略可以是同步或異步的。消息過期:Pulsar允許設(shè)置消息的過期時間,過期的消息將被自動刪除,以節(jié)省存儲空間。4.2示例代碼:創(chuàng)建生產(chǎn)者并發(fā)送消息importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.ProducerConfiguration;
importorg.apache.pulsar.client.api.Schema;
publicclassPulsarProducerExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建PulsarClient實例
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
//創(chuàng)建Producer配置
ProducerConfiguration<String>producerConfiguration=ProducerConfiguration.builder()
.schema(Schema.STRING)
.build();
//創(chuàng)建Producer
Producer<String>producer=client.newProducer(producerConfiguration)
.topic("persistent://public/default/my-topic")
.create();
//發(fā)送消息
for(inti=0;i<10;i++){
Stringmessage="HelloPulsar"+i;
producer.send(message);
}
//關(guān)閉Producer
producer.close();
//關(guān)閉PulsarClient
client.close();
}
}4.2.1代碼解釋創(chuàng)建PulsarClient:通過PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();創(chuàng)建一個PulsarClient實例,其中serviceUrl指定了Pulsar服務(wù)的URL。創(chuàng)建Producer配置:使用ProducerConfiguration.builder().schema(Schema.STRING).build();創(chuàng)建一個Producer配置對象,指定了消息的Schema為字符串類型。創(chuàng)建Producer:通過client.newProducer(producerConfiguration).topic("persistent://public/default/my-topic").create();創(chuàng)建一個Producer,其中topic指定了消息將被發(fā)送到的Topic。發(fā)送消息:使用producer.send(message);方法發(fā)送消息到指定的Topic。關(guān)閉Producer和PulsarClient:在完成消息發(fā)送后,調(diào)用producer.close();和client.close();來釋放資源。通過以上步驟,我們可以看到Pulsar消息隊列中消息發(fā)布的完整流程,從創(chuàng)建生產(chǎn)者到發(fā)送消息,再到資源的釋放,每一步都遵循了Pulsar的API規(guī)范,確保了消息的正確發(fā)送和系統(tǒng)的穩(wěn)定運行。5消息消費5.1消費者角色在ApachePulsar消息隊列中,消費者是消息的接收者。它們訂閱主題,接收并處理由生產(chǎn)者發(fā)送的消息。消費者可以是任何能夠接收和處理消息的應(yīng)用程序或服務(wù)。在Pulsar中,消費者通過創(chuàng)建一個Consumer對象來訂閱主題,這個對象提供了接收消息的方法。5.1.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費者,訂閱主題
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
//接收消息
Message<String>msg=consumer.receive();
//處理消息
System.out.println("Receivedmessage:"+msg.getValue());
//確認消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費者和客戶端
consumer.close();
client.close();
}
}5.2消息消費模式Pulsar支持兩種主要的消息消費模式:獨占(Exclusive)和共享(Shared)。5.2.1獨占消費在獨占消費模式下,一個訂閱只能被一個消費者使用。這意味著如果多個消費者嘗試訂閱同一個主題和訂閱名稱,只有第一個消費者能夠成功訂閱,其他消費者將收到錯誤。這種模式適用于需要確保消息只被一個消費者處理的情況。5.2.2共享消費共享消費模式允許多個消費者訂閱同一個主題和訂閱名稱。消息將被分發(fā)給訂閱中的任意一個消費者,確保消息至少被處理一次。這種模式適用于需要高可用性和負載均衡的場景。5.2.3示例代碼//獨占消費模式
Consumer<String>exclusiveConsumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//共享消費模式
Consumer<String>sharedConsumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();5.3消息重試與死信隊列在處理消息時,如果消費者未能成功處理消息,Pulsar提供了消息重試機制。消息可以被重新發(fā)送給消費者,直到成功處理。如果消息在多次重試后仍然無法處理,它可以被移動到死信隊列(DeadLetterQueue)。5.3.1消息重試消費者可以通過negativeAcknowledge方法來標(biāo)記消息為未處理,這將觸發(fā)消息重試。Pulsar允許配置重試策略,包括重試次數(shù)和重試間隔。5.3.2死信隊列死信隊列用于存儲那些無法被正常處理的消息。這些消息可以被單獨處理或分析,以找出處理失敗的原因。在Pulsar中,可以通過配置主題的策略來啟用死信隊列。5.3.3示例代碼//消費者處理消息失敗,請求重試
consumer.negativeAcknowledge(msg);
//配置主題策略,啟用死信隊列
TopicPoliciespolicies=newTopicPolicies();
policies.setDeadLetterTopic("persistent://public/default/my-dead-letter-topic");
client.getAdmin().topics().updateTopicPolicies("persistent://public/default/my-topic",policies);通過上述代碼和解釋,我們深入了解了Pulsar中消費者角色的定義、消息消費的不同模式,以及如何處理消息重試和死信隊列。這些機制確保了消息的可靠處理和系統(tǒng)的高可用性。6消息路由策略6.1消息分發(fā)原理在消息隊列系統(tǒng)中,如ApachePulsar,消息的分發(fā)是一個關(guān)鍵過程,它決定了消息如何從生產(chǎn)者到達消費者。Pulsar通過其獨特的消息模型和靈活的路由策略,確保了消息的高效、有序和可靠傳輸。消息分發(fā)原理基于以下幾點:主題(Topic):在Pulsar中,消息被發(fā)布到特定的主題上,這些主題可以是持久的或非持久的,根據(jù)消息的存儲需求而定。分區(qū)(Partition):為了提高吞吐量和可擴展性,主題可以被劃分為多個分區(qū),每個分區(qū)獨立存儲和處理消息。消費者(Consumer):消費者訂閱主題以接收消息。Pulsar支持獨占、共享和鍵共享三種訂閱類型,以滿足不同的消費模式。消息路由:生產(chǎn)者發(fā)送消息時,Pulsar根據(jù)配置的路由策略決定將消息發(fā)送到哪個分區(qū)。這確保了消息的均衡分布和處理。6.2路由策略類型Pulsar提供了多種消息路由策略,以適應(yīng)不同的應(yīng)用場景和需求:RoundRobin:輪詢策略,將消息均勻地分發(fā)到所有分區(qū),以實現(xiàn)負載均衡。KeyBased:基于消息鍵的策略,根據(jù)消息中的鍵將消息路由到特定的分區(qū),確保具有相同鍵的消息被發(fā)送到同一分區(qū),便于實現(xiàn)消息的有序處理。Custom:自定義策略,允許用戶實現(xiàn)自己的消息路由邏輯,提供最大的靈活性。6.2.1RoundRobin路由策略原理RoundRobin策略是最簡單的消息分發(fā)方式,它將消息輪流發(fā)送到不同的分區(qū),確保每個分區(qū)的負載大致相同。這種策略適用于消息大小和處理時間相對均勻的場景。示例假設(shè)我們有一個主題my-topic,它被劃分為4個分區(qū)。使用RoundRobin策略,消息將按照以下順序被發(fā)送:第1條消息發(fā)送到分區(qū)0第2條消息發(fā)送到分區(qū)1第3條消息發(fā)送到分區(qū)2第4條消息發(fā)送到分區(qū)3第5條消息再次發(fā)送到分區(qū)0,以此類推。6.2.2KeyBased路由策略原理KeyBased策略根據(jù)消息中的鍵(key)將消息路由到特定的分區(qū)。如果消息包含相同的鍵,它們將被發(fā)送到同一分區(qū),這有助于實現(xiàn)消息的有序處理和聚合。鍵可以是消息中的任意字段,也可以是自定義的鍵生成邏輯。示例假設(shè)我們有一個主題my-topic,它被劃分為3個分區(qū)。消息包含一個用戶ID作為鍵,ID為1的消息將始終被發(fā)送到分區(qū)0,ID為2的消息將被發(fā)送到分區(qū)1,ID為3的消息將被發(fā)送到分區(qū)2,以此類推。#Python示例代碼
importpulsar
client=pulsar.Client('pulsar://localhost:6650')
producer=client.create_producer('my-topic',key_type='string')
#發(fā)送消息,鍵為用戶ID
foriinrange(10):
key=str(i%3)#生成鍵,確保消息被均勻分布到3個分區(qū)
producer.send(('message-'+str(i)).encode('utf-8'),key=key)
client.close()6.2.3Custom路由策略原理Custom策略允許用戶實現(xiàn)自己的消息路由邏輯。這提供了最大的靈活性,但同時也要求用戶對消息隊列的性能和可靠性有深入的理解。自定義策略可以通過實現(xiàn)Pulsar的MessageRouter接口來實現(xiàn)。示例假設(shè)我們想要根據(jù)消息的類型(例如,交易、日志、警報)將消息路由到不同的分區(qū),可以實現(xiàn)一個自定義的路由策略。//Java示例代碼
importorg.apache.pulsar.client.api.MessageRouter;
importorg.apache.pulsar.client.api.MessageRouterBuilder;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
publicclassCustomMessageRouterimplementsMessageRouter<String>{
@Override
publicintroute(Stringkey,intnumPartitions){
if(key.startsWith("transaction")){
return0;//交易消息發(fā)送到分區(qū)0
}elseif(key.startsWith("log")){
return1;//日志消息發(fā)送到分區(qū)1
}elseif(key.startsWith("alert")){
return2;//警報消息發(fā)送到分區(qū)2
}
returnnumPartitions/2;//默認情況下,將消息發(fā)送到中間分區(qū)
}
}
publicclassCustomProducer{
publicstaticvoidmain(String[]args)throwsException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.messageRouter(routerBuilder)
.create();
//發(fā)送不同類型的消息
producer.send("transaction-1","transaction");
producer.send("log-2","log");
producer.send("alert-3","alert");
client.close();
}
}6.3策略配置示例在Pulsar中,配置路由策略可以通過修改生產(chǎn)者創(chuàng)建時的參數(shù)來實現(xiàn)。以下是一個使用RoundRobin策略的配置示例://Java示例代碼
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.Producer;
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();在這個示例中,messageRoutingMode參數(shù)被設(shè)置為RoundRobinPartition,這告訴Pulsar使用輪詢策略來分發(fā)消息。對于KeyBased策略,可以通過在發(fā)送消息時指定鍵來實現(xiàn):producer.send(("message-"+i).getBytes(),"key-"+(i%3));在這個示例中,消息的鍵被設(shè)置為"key-"+(i%3),這確保了具有相同鍵的消息被發(fā)送到同一分區(qū)。對于Custom策略,需要實現(xiàn)MessageRouter接口,并在創(chuàng)建生產(chǎn)者時指定自定義的路由策略。MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.messageRouter(routerBuilder)
.create();在這個示例中,CustomMessageRouter類實現(xiàn)了自定義的路由邏輯,通過MessageRouterBuilder和messageRouter參數(shù),可以將這個自定義策略應(yīng)用到生產(chǎn)者上。通過以上示例,我們可以看到Pulsar提供了豐富的消息路由策略,以滿足不同場景的需求。選擇合適的策略對于優(yōu)化消息隊列的性能和可靠性至關(guān)重要。7高級消息處理7.1消息壓縮在消息隊列系統(tǒng)中,如ApachePulsar,消息壓縮是一種優(yōu)化網(wǎng)絡(luò)傳輸和存儲空間的有效手段。Pulsar支持多種壓縮算法,包括LZ4、ZLIB、ZSTD等,以適應(yīng)不同的性能和壓縮比需求。7.1.1原理消息壓縮在消息發(fā)送前進行,將原始消息數(shù)據(jù)轉(zhuǎn)換為更小的二進制格式,從而減少在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量和存儲空間的占用。接收方在消費消息時,會自動解壓縮消息,恢復(fù)原始數(shù)據(jù)。7.1.2示例在Pulsar中,可以通過設(shè)置producer的compressionType屬性來啟用消息壓縮。以下是一個使用LZ4壓縮算法的示例:importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassCompressedProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]>producer=client.newProducer()
.topic("persistent://sample/standalone/ns/my-topic")
.compressionType(CompressionType.LZ4)
.create();
Stringmessage="Hello,Pulsar!";
producer.send(message.getBytes());
producer.close();
client.close();
}
}7.1.3解釋在上述代碼中,我們創(chuàng)建了一個PulsarClient實例,并指定了服務(wù)URL。然后,我們創(chuàng)建了一個Producer,設(shè)置了topic和compressionType為LZ4。發(fā)送消息時,原始字符串被轉(zhuǎn)換為字節(jié)數(shù)組,然后通過壓縮的Producer發(fā)送。接收方會自動解壓縮消息,無需額外的解壓縮邏輯。7.2消息時間戳在Pulsar中,每條消息都有一個時間戳,用于記錄消息的創(chuàng)建時間或發(fā)送時間。時間戳可以用于實現(xiàn)時間窗口、消息延遲發(fā)送等功能。7.2.1原理Pulsar允許在消息發(fā)送時指定時間戳,如果沒有指定,系統(tǒng)會自動使用消息發(fā)送時的時間作為時間戳。時間戳以毫秒為單位,存儲在消息的元數(shù)據(jù)中。7.2.2示例以下是一個在發(fā)送消息時指定時間戳的示例:importorg.apache.pulsar.client.api.MessageBuilder;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassTimestampProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]>producer=client.newProducer()
.topic("persistent://sample/standalone/ns/my-topic")
.create();
longtimestamp=System.currentTimeMillis()+10000;//發(fā)送10秒后的時間
Stringmessage="Hello,Pulsar!";
producer.newMessage()
.value(message.getBytes())
.timestamp(timestamp)
.send();
producer.close();
client.close();
}
}7.2.3解釋在本例中,我們創(chuàng)建了一個Producer,并通過newMessage()方法創(chuàng)建了一個消息構(gòu)建器。我們指定了一個未來的時間戳,使得消息在10秒后才被標(biāo)記為發(fā)送。這在實現(xiàn)延遲消息處理時非常有用。7.3消息順序保證在分布式系統(tǒng)中,消息的順序處理是一個常見的需求。Pulsar提供了消息順序保證的機制,確保消息按照發(fā)送的順序被消費。7.3.1原理Pulsar通過在Producer和Consumer上設(shè)置相應(yīng)的屬性來實現(xiàn)消息順序。在Producer端,可以設(shè)置blockIfQueueFull屬性為true,以確保消息在隊列滿時被阻塞,直到隊列有空間。在Consumer端,可以設(shè)置subscriptionType為Exclusive,以確保一個Consumer獨占一個topic,從而保證消息順序。7.3.2示例以下是一個使用Pulsar保證消息順序的示例:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.SubscriptionType;
importjava.util.concurrent.TimeUnit;
publicclassOrderedConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException,InterruptedException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]>consumer=client.newConsumer()
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true){
Message<byte[]>msg=consumer.receive(5,TimeUnit.SECONDS);
if(msg!=null){
System.out.println("Receivedmessage:
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2022年公務(wù)員多省聯(lián)考《申論》真題(四川行政執(zhí)法卷)及答案解析
- 吉林師范大學(xué)《視覺設(shè)計進階》2021-2022學(xué)年第一學(xué)期期末試卷
- 吉林師范大學(xué)《行政法概論》2021-2022學(xué)年第一學(xué)期期末試卷
- 宿舍用品質(zhì)量監(jiān)控及售后方案
- 城市交通規(guī)劃與改善方案
- 2024專柜租賃協(xié)議合同范本
- 餐飲行業(yè)市場推廣協(xié)議書
- 商業(yè)綜合體建設(shè)創(chuàng)優(yōu)策劃方案
- 吉林大學(xué)《水彩人體Ⅱ》2021-2022學(xué)年第一學(xué)期期末試卷
- 吉林大學(xué)《稅法》2021-2022學(xué)年第一學(xué)期期末試卷
- 院前急救與院內(nèi)急診有效銜接工作制度
- 2.1充分發(fā)揮市場在資源配置中的決定性作用(課件) 2024-2025學(xué)年高中政治 必修2 經(jīng)濟與社會
- Unit+5+Fun+Clubs+Section+A++(1a-1d)教學(xué)課件-2024-2025學(xué)年人教新目標(biāo)(2024)七年級英語上冊
- 超聚變 FCIA 考試題庫
- 2024-2025學(xué)年初中地理七年級上冊(2024)晉教版(2024)教學(xué)設(shè)計合集
- 陜煤集團筆試題庫及答案
- 33 《魚我所欲也》對比閱讀-2024-2025中考語文文言文閱讀專項訓(xùn)練(含答案)
- (正式版)HGT 22820-2024 化工安全儀表系統(tǒng)工程設(shè)計規(guī)范
- (高清版)TDT 1075-2023 光伏發(fā)電站工程項目用地控制指標(biāo)
- 《中華民族共同體概論》考試復(fù)習(xí)題庫(含答案)
- 2022-2023學(xué)年武漢市江岸區(qū)七年級英語上學(xué)期期中質(zhì)量檢測卷附答案
評論
0/150
提交評論