Apache Storm技術(shù)參考手冊_第1頁
Apache Storm技術(shù)參考手冊_第2頁
Apache Storm技術(shù)參考手冊_第3頁
Apache Storm技術(shù)參考手冊_第4頁
Apache Storm技術(shù)參考手冊_第5頁
已閱讀5頁,還剩107頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

目錄

簡介.............................................................................4

什么是ApacheStorm?.................................................................................................................4

ApacheStormvsHadoop...............................................................................................................4

使用ApacheStorm的例子.....................................................5

ApacheStorm優(yōu)勢............................................................5

Storm的核心組件.............................................................5

Topology...........................................................................................................................................8

Storm集群安裝..............................................................9

StormHelloWorld...........................................................................................................................9

環(huán)境準(zhǔn)備................................................................9

具體流程................................................................10

核心概念........................................................................35

拓撲........................................................................36

任務(wù)........................................................................37

進程........................................................................37

流分組......................................................................37

隨機分組...............................................................37

字段分組...............................................................38

全局分組...............................................................39

所有分組...............................................................40

集群架構(gòu)........................................................................40

工作流程........................................................................42

分布式消息系統(tǒng).................................................................44

什么是分布式消息系統(tǒng)?.....................................................44

Thrift協(xié)議...................................................................45

安裝............................................................................46

步驟1-驗證Java安裝.......................................................46

步驟1.1-下載JDK.............................................................................................................46

步驟1.2-解壓文件......................................................47

步驟1.3-移動到opt文件夾.............................................47

步驟1.4-設(shè)置路徑......................................................47

步驟1.5-Java替代項....................................................48

步驟1.6-Java安裝驗證..................................................48

第2步-ZooKeeper框架安裝.................................................48

步驟2.1-下載ZooKeeper.................................................................................................48

步驟2.2-解壓tar文件...................................................48

步驟2.3-創(chuàng)建配置文件..................................................49

步驟2.4-啟動ZooKeeper服務(wù)器..........................................49

步驟2.5?啟動CLI..............................................................................................................49

步驟2.6-停止ZooKeeper服務(wù)器..........................................50

第3步-ApacheStorm框架安裝...............................................51

步驟3.1-下載Storm.........................................................................................................51

步驟3.2??解壓tar文件...................................................51

步驟3.3-打開配置文件..................................................51

步驟3.4-啟動Nimbus.......................................................................................................52

步驟3.5-啟動Supervisor..................................................................................................52

步驟3.6-啟動UI...............................................................................................................52

工作實例........................................................................53

場景-移動呼叫日志分析器..................................................53

Spout創(chuàng)建..................................................................53

open.......................................................................................................................................54

nextTuple...............................................................................................................................54

close.......................................................................................................................................55

declareOutputFields.............................................................................................................55

ack..........................................................................................................................................55

fail...........................................................................................................................................55

FakeCallLogReaderSpout......................................................................................................56

編碼-FakeCallLogReaderSpout.java.................................................................................56

Bolt創(chuàng)建....................................................................60

Prepare...................................................................................................................................60

execute...................................................................................................................................60

cleanup...................................................................................................................................61

declareOutputFields.............................................................................................................61

呼叫日志創(chuàng)建者bolt...................................................................................................................61

編碼-CallLogCreatorBoltjava............................................................................................62

呼叫日志計數(shù)器Bolt...................................................................................................................64

編碼-CallLogCounterBolt.java..........................................................................................64

創(chuàng)建拓撲....................................................................66

本地集群....................................................................67

編碼-LogAnalyserStorm.java.............................................................................................67

構(gòu)建和運行應(yīng)用程序.........................................................69

輸出....................................................................69

非JVM語言.................................................................70

Python綁定.............................................................70

Trident....................................................................................................................................................71

Trident拓撲.................................................................72

TridentTuples................................................................................................................................72

TridentSpout.................................................................................................................................73

Trident操作.................................................................73

過濾....................................................................74

函數(shù)....................................................................乃

聚合....................................................................76

分組....................................................................78

合并和連接.............................................................79

狀態(tài)維護....................................................................79

分布式RPC...................................................................................................................................80

什么時候使用Trident?.............................................................................................................80

Trident的工作實例...........................................................80

格式化呼叫信息.........................................................81

編碼:FormatCall.java.........................................................................................................81

CSVSplit.................................................................................................................................81

編碼:CSVSplit.java.............................................................................................................82

日志分析器.............................................................82

編碼:LogAnalyserTrident.java..........................................................................................83

構(gòu)建和運行應(yīng)用程序.........................................................86

輸出....................................................................87

在Twitter上的應(yīng)用..............................................................88

Twitter............................................................................................................................................88

Spout創(chuàng)建..............................................................88

編碼:TwitterSampleSpout.java........................................................................................89

Hashtag閱讀器spout..................................................................................................................94

編碼:HashtagReaderBok.java..........................................................................................94

Hashtag計數(shù)器spout.................................................................................................................96

編碼:HashtagCounterBolt.java.........................................................................................96

提交拓撲....................................................................99

編碼:TwitterHashtagStorm.java.......................................................................................99

構(gòu)建和運行應(yīng)用程序........................................................101

輸出...................................................................101

在雅虎財經(jīng)上的應(yīng)用............................................................102

Spout創(chuàng)建.................................................................103

編碼:YahooFinanceSpout.java........................................................................................103

Bolt倉ij建...................................................................106

編碼:PriceCutOffBolt.java...............................................................................................106

提交拓撲...................................................................109

編碼:YahooFinanceStorm.java........................................................................................109

構(gòu)建和運行應(yīng)用程序........................................................110

輸出...................................................................111

應(yīng)用程序.......................................................................111

Klout.............................................................................................................................................Ill

天氣頻道...................................................................111

電信業(yè).....................................................................112

簡介

什么是ApacheStorm?

ApacheStorm是Twitter開源的分布式實時大數(shù)據(jù)處理框架,最早開源于

github,從0.9.1版本之后,歸于Apache社區(qū),被業(yè)界稱為實時版Hadoop。

Storm設(shè)計用于在容錯和水平可擴展方法中處理大量數(shù)據(jù)。它是一個流數(shù)據(jù)框

架,具有最高的攝取率。雖然Storm是無狀態(tài)的,它通過ApacheZooKeeper管

理分布式環(huán)境和集群狀態(tài)。它很簡單,您可以并行地對實時數(shù)據(jù)執(zhí)行各種操

作。

隨著越來越多的場景對Hadoop的MapReduce高延遲無法容忍,比如網(wǎng)站統(tǒng)

計、推薦系統(tǒng)、預(yù)警系統(tǒng)、金融系統(tǒng)(高頻交易、股票)等等,大數(shù)據(jù)實時處理

解決方案(流計算)的應(yīng)用E趨廣泛,目前已是分布式技術(shù)領(lǐng)域最新爆發(fā)點,

而Storm更是流計算技術(shù)中的佼佼者和主流。

ApacheStorm繼續(xù)成為實時數(shù)據(jù)分析的領(lǐng)導(dǎo)者。Storm易于設(shè)置和操作,并且

它保證每個消息將通過拓撲至少處理一次。

ApacheStormvsHadoop

基本上Hadoop和Storm框架用于分析大數(shù)據(jù)。兩者互補,在某些方面有所不

同。ApacheSiorm執(zhí)行除持久性之外的所有操作,而Hadoop在所有方面都很

好,但滯后于實時計算。下表比較了Stoi*m和Hadoop的屬在。

StormHadoop

實時流處理批量處理

無狀態(tài)有狀態(tài)

主/從架構(gòu)與基于ZooKeeper的協(xié)調(diào)。主節(jié)點稱為具有/不具有基于ZooKeeper的協(xié)調(diào)

nimbus,從屬節(jié)點是主管。的主-從結(jié)構(gòu)。主節(jié)點是作業(yè)跟蹤

器,從節(jié)點是任務(wù)跟蹤器,

Storm流過程在集群上每秒可以訪問數(shù)萬條消息。Hadoop分布式文件系統(tǒng)(HDFS)使

用MapReduce框架來處理大量的數(shù)

據(jù),需要幾分鐘或幾小時。

Storm拓撲運行直到用戶關(guān)閉或意外的不可恢復(fù)故MapReduce作業(yè)按順序執(zhí)行并最終

障。完成。

兩者都是分布式和容錯的

如果nimbus/supervisor死機,重新啟動使它從它停如果JobTracker死機,所有正在運行

止的地方繼續(xù),因此沒有什么受到影響。的作業(yè)都會丟失。

使用ApacheStorm的例子

ApacheStorm對于實時大數(shù)據(jù)流處理非常有名。因此,大多數(shù)公司都將Storm

用作其系統(tǒng)的一個組成部分。一些值得注意的例子如下?

Twitter-Twitter正在使用ApacheStorm作為其“發(fā)布商分析產(chǎn)品”?!鞍l(fā)布

商分析產(chǎn)品”處理Twiiter平臺中的每個tweets和點擊。ApacheStorm與

Twitter基礎(chǔ)架構(gòu)深度集成。

NaviSite-NaviSite正在使用Storm進行事件日志監(jiān)控/審計系統(tǒng)。系統(tǒng)中生成

的每個日志都將通過Slorm。Storm將根據(jù)配置的正則表達式集檢查消息,如果

存在匹配,那么該特定消息將保存到數(shù)據(jù)庫。

Wego-Wego是位于新加坡的旅行元搜索引擎。旅行相關(guān)數(shù)據(jù)來自世界各地的

許多來源,時間不同。Storm幫助Wego搜索實時數(shù)據(jù),解決并發(fā)問題,并為最

終用戶找到最佳匹配。

ApacheStorm優(yōu)勢

下面是ApacheStorm提供的好處列表:

?Storm是開源的,強大的,用戶友好的。它可以用于小公司和大公司。

?Storm是容錯的,靈活的,可靠的,并且支持任何編程語言。

?允許實時流處理。

?Storm是令人難以置信的快,因為它具有巨大的處理數(shù)據(jù)的力量。

?Slorm可以通過線性增加資源來保持性能,即使在負載增加的情況下。它是高

度可擴展的。

?Storm在幾秒鐘或幾分鐘內(nèi)執(zhí)行數(shù)據(jù)刷新和端到端傳送響應(yīng)取決于問題。它具

有非常低的延遲。

?Storm有操作智能。

?Storm提供保證的數(shù)據(jù)處理,即使群集中的任何連接的節(jié)點死或消息丟失。

Storm的核心組件

?Nimbus:即Storm的Master,負責(zé)資源分配和任務(wù)調(diào)度。一個Storm集群只有一

Nimbuso

?Supervisor:即Storm的Slave,負責(zé)接收Nimbus分配的任務(wù),管理所有Worke

r,一個Supervisor節(jié)點中包含多個Worker進程。

?Worker:工作進程,每個工作進程中都有多個Task.

■Task:任務(wù),在Storm集群中每個Spout和Bolt都由若干個任務(wù)(tasks)來執(zhí)

行。每個任務(wù)都與一個執(zhí)行線程相對應(yīng)。

?Topology:計算拓撲,Storm的拓撲是對實時計算應(yīng)用邏輯的封裝,它的作用與

MapReduce的任務(wù)(Job)很相似,區(qū)別在于MapReduce的一個Job在得到結(jié)

果之后總會結(jié)束,而拓撲會一直在集群中運行,直到你手動去終止它。拓撲還可以理

解成由一系列通過數(shù)據(jù)流(StreamGrouping)相互關(guān)聯(lián)的Spout和Bolt組成的

的拓撲結(jié)構(gòu)。

?Stream:數(shù)據(jù)流(Streams)是Storm中最核心的抽象概念。一個數(shù)據(jù)流指的是在

分布式環(huán)境中并行創(chuàng)建、處理的一組元組(tuple)的無界序列。數(shù)據(jù)流可以由一種

能夠表述數(shù)據(jù)流中元組的域(fields)的模式來定義。

?Spout:數(shù)據(jù)源(Spout)是拓撲中數(shù)據(jù)流的來源。一般Spout會從一個外部的數(shù)

據(jù)源讀取元組然后將他們發(fā)送到拓撲中。根據(jù)需求的不同,Spout既可以定義為可靠

的數(shù)據(jù)源,也可以定義為不可靠的數(shù)據(jù)源。一個可靠的Spout能夠在它發(fā)送的元組

處理失敗時重新發(fā)送該元組,以確保所有的元組都能得到正確的處理;相對應(yīng)的,不

可靠的Spout就不會在元組發(fā)送之后對元組進行任何其他的處理。一個Spout可

以發(fā)送多個數(shù)據(jù)流。

?Bolt:拓撲中所有的數(shù)據(jù)處理均是由Bolt完成的。通過數(shù)據(jù)過濾(filtering\函數(shù)

處理(functions\聚合(aggregations\聯(lián)結(jié)(joins\數(shù)據(jù)庫交互等功能,Bolt

幾乎能夠完成任]可一種數(shù)據(jù)處理需求。一個Bolt可以實現(xiàn)簡鑿的數(shù)據(jù)流轉(zhuǎn)換,而更

復(fù)雜的數(shù)據(jù)流變換通常需要使用多個Bolt并通過多個步驟完成。

?Streamgrouping:為拓撲中的每個Bolt的確定輸入數(shù)據(jù)流是定義一個拓撲的重

要環(huán)節(jié)。數(shù)據(jù)流分組定義了在Bolt的不同任務(wù)(tasks)中劃分數(shù)據(jù)流的方式。在S

torm中有八種內(nèi)置的數(shù)據(jù)流分組方式。

?Reliability:可靠性。Storm可以通過拓撲來確保每個發(fā)送的元組都能得到正確處

理。通過跟蹤由Spout發(fā)出的每個元組構(gòu)成的元組樹可以確定元組是否已經(jīng)完成處

理。每個拓撲都有一個"消息延時"參數(shù),如果Storm在延時時間內(nèi)沒有檢測到元

組是否處理完成,就會將該元組標(biāo)記為處理失敗,并會在稍后重新發(fā)送該元組。

Storm程序再Storm集群中運行的示例圖如下:

Topology

為什么把Topology單獨提出來呢,因為Topology是我們開發(fā)程序主要的用的組件。

Topology和MapReduce很相像。

M叩Reduce是M叩進行獲取數(shù)據(jù),Reduce進行處理數(shù)據(jù)。

而Topology則是使用Spout獲取數(shù)據(jù),Bolt來進行計算。

總的來說就是一個Topology由一個或者多個的Spout和Bolt組成

具體流程是怎么走,可以通過查看下面這張圖來進行了解。

示例圖:

Stream

^Stream2

.----Stream!

Stream"

0一-一

'、~Stream------Stream

Stream

g

stream—

注:圖片來源/api/tutorials/storm/52o

圖片有三種模式,解釋如下:

第一種比較簡單,就是由一個Spout獲取物g,然后交給一個Bolt進行處理;

第二種稍微復(fù)雜點,由一個Spout獲取數(shù)據(jù),然后交給一個Bolt進行處理一部分,然后

在交給下一個Bolt進行處理其他部分。

第三種則上瞰復(fù)雜,一個Spout可以同時發(fā)送數(shù)據(jù)到多個Bolt,而一個Bolt也可以接受

多個Spout或多個Bolt,最終形成多個數(shù)據(jù)流。但是這種數(shù)據(jù)流必須是有方向的,有起點

和終點,不然會造成死循環(huán),數(shù)據(jù)永遠也處理不完。就是Spout發(fā)給Boltl,Boltl發(fā)給

Bolt2,Bolt2又發(fā)給了Boltl,最終形成了一環(huán)狀。

Storm集群安裝

之前已經(jīng)寫過了,這里就不在說明了。

博客itetlhhttp:〃/2018/01/26/pancm70/

StormHelloWorld

前面講了一些Storm概念,可能在理解上不太清楚,那么這里我們就用一個HelloWorld

代碼示例來體驗下Storm運作的流程吧。

環(huán)境準(zhǔn)備

在進行代碼開發(fā)之前,首先得做好相關(guān)的準(zhǔn)備。

本項目是使用Maven構(gòu)建的,使用Storm的版本為1.1.1。

Maven的相關(guān)依賴如下:

<!一-storm相關(guān)jar—>

<dependency>

<groupld>org.apache.storm</groupld>

<artifactId>storm-core</artifactId>

<version>l.1.l</version>

<scope>provided</scope>

</dependency>

具體流程

在寫代碼的時候,我們先來明確要用Storm做什么。

那么第一個程序,就簡單的輸出下信息。

具體步驟如下:

1.啟動topology,設(shè)置好Spout和Bolt.

2.將Spout獲取的數(shù)據(jù)傳遞給Bolt,

3.Bolt接受Spout的數(shù)據(jù)進行打印。

Spout

那么首先開始編寫Spout類。一股是實現(xiàn)IRichSpout或繼承BaseRichSpout該類,然

后實現(xiàn)該方法。

這里我們繼承BaseRichSpout這個類,該類需要實現(xiàn)這幾個主要的方法:

一、open

open()方法中是在ISpout接口中定義,在Spout組件初始化時被調(diào)用。

有三個參數(shù),它們的作用分別是:

1.Storm配置的Map;

2.topology中組件的信息;

3.發(fā)射tuple的方法;

代碼示例:

?Override

publicvoidopen(Mapmap,TopologyContextargl,

SpoutOutputCollectorcollector){

System,out.printlnC^open:imap.get("test"));

this,collector=collector;

)

二、nextTuple

nextTuple。方法是Spout實現(xiàn)的核心。

也就是主要執(zhí)行方法,用于輸出信息,通過collector,emit方法發(fā)射。

這里我們的數(shù)據(jù)信息已經(jīng)寫死了,所以這里我們就直接將數(shù)據(jù)進行發(fā)送。

這里設(shè)置只發(fā)送兩次。

代碼示例:

@Override

publicvoidnextTuple(){

if(count<=2){

System.out.printin(〃第〃+count+”次開始發(fā)送數(shù)

據(jù)???〃);

this,collector,emit(newValues(message));

count++;

三、declareOutputFields

declareOutputFields是在IComponent接口中定義,用于聲明數(shù)據(jù)格式。

即輸出的一個Tuple中,包含幾個字段。

因為這里我們只發(fā)射一個,所以就指定一個。如果是多個,則用逗號隔開。

代碼示例:

?Override

publicvoiddeclareOutputFields(OutputFieldsDeclarer

declarer){

System.out.printin("定義格式...;

declarer,declare(newFields(field));

)

四、ack

ack是在ISpout接口中定義,用于表示Tuple處理成功。

代碼示例:

?Override

publicvoidack(Objectobj){

System.out.println("ack:"+obj);

五、fail

fail是在ISpout接口中定義,用于表示Tuple處理失敗。

代碼示例:

@Override

publicvoidfail(Objectobj){

System,out.printin("失敗:〃+obj);

)

六、close

dose是在ISpout接口中定義,用于表示Topology停止。

代碼示例:

?Override

publicvoidclose(){

System,out.printin("關(guān)閉...;

)

至于還有其他的,這里就不在一列舉了。

Bolt

Bolt是用于處理數(shù)據(jù)的組件,主要是由execute方法來進行實現(xiàn)。一般來說需要實

現(xiàn)IRichBolt或繼承BaseRichBolt該類,然后實現(xiàn)其方法。

需要實現(xiàn)方法如下:

一、prepare

在Bolt啟動前執(zhí)行,提供Bolt啟動環(huán)境配置的入口。

參數(shù)基本和Sqout-Wo

一般對于不可序列化的對象進行實例化。

這里的我們就簡單的打印下

?Override

publicvoidprepare(Mapmap,TopologyContextargl,

OutputCollectorcollector){

Systpm.out.print1n(^prpparp:zz+map.gpt(,,tpst,/));

this,collector=collector;

)

注:如果是可以序列化的對象,那么最好是使用構(gòu)造函數(shù)。

二、execute

execute。方法是Bolt實現(xiàn)的核心。

也就是執(zhí)行方法,每次Bolt從流凌收f丁閱的tuple,都會調(diào)用這個方法。

從tuple中獲取消息可以使用tuple.getStringO和tuple.getStringByFieldO;這

兩個方法。個人推薦第二種,可以通過field來指定接收的消息。

注:如果繼承的是IRichBolt,則需要手動ack。這里就不用了,BaseRichBolt會自動幫我

們應(yīng)答。

代碼示例:

?Override

publicvoidexecute(Tupletuple){

//Stringmsg=tuple.getString(O);

Stringmsg=tuple.getStringByField("test");

〃這里我們就不做消息的處理,只打印

System.out.printin(,zBolt第〃+count+”接受的消息:"+msg);

count++;

/**

*

*沒次調(diào)用處理一個輸入的tuple,所有的tuple都必須在一定時間

內(nèi)應(yīng)答。

*可以是ack或者fail。否則,spout就會重發(fā)tuple。

*/

//collector,ack(tuple);

)

三、declareOutputFields

和Spout的一樣。

因為到了這里就不再輸出了,所以就什么都沒寫。

?Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerargO)

(

)

cleanup

cleanup是IBolt接口中定義,用于釋放bolt占用的資源。

Storm在終止一個bolt之前會調(diào)用這個方法。

因為這里沒有什么資源需要釋放,所以就簡單的打印一句就行了。

?Override

publicvoidcleanup(){

System.out.printin("資源釋放〃);

)

Topology

這里我們就是用main方法進行提交topology。

不過在提交topology之前,需要進行相應(yīng)的設(shè)置。

這里我就不一細說了,代碼的注釋已經(jīng)很詳細了。

代碼示例:

importorg.apache.storm.Config;

importorg.apache,storm.LocalCluster;

importorg.apache,storm.StormSubmitter;

importorg.apache,storm,topology.TopologyBuilder;

/**

*

*Title:App

*Description:

*storm測試

*Version:1.0.0

*@authorpancm

*@date2018年3月6日

*/

publicclassApp{

privatestaticfinalStringstrl="testl”;

privatestaticfinalStringstr2="test2〃;

publicstaticvoidmain(String[]args)(

//TODOAuto-generatedmethodstub

〃定義一個拓撲

TopologyBuilderbuilder=newTopologyBuilder();

〃設(shè)置一個Executeor(線程),默認一個

builder.setSpout(strl,newTestSpout());

〃設(shè)置一個Executeor(線程),和一個task

builder.setBolt(str2,new

TestBolt(),1).setNumTasks(l).shuffleGrouping(strl);

Configconf=newConfigO;

conf,put("test","test");

try(

〃運行拓撲

if(args!=null&&args.length>0){//有參數(shù)時,表示向集群提交

作業(yè),并把第一個參數(shù)當(dāng)做topology名稱

System.out.printin(〃遠程模式〃);

StormSubmitter.submitTopology(args[0],conf,

builder.createTopology());

}else{〃沒有參數(shù)時,本地提交

〃啟動本地模式

System,out.printIn(〃本地模式〃);

LocalClustercluster=newLocalCluster();

cluster,submitTopology11177,conf,

builder.createTopology());

Thread,sleep(10000);

//關(guān)閉本地集群

cluster,shutdown();

)

}catch(Exceptione)(

e.printStackTraceO;

)

)

}

運行該方法,輸出結(jié)果如下:

本地模式

定義格式.??

open:test

第1次開始發(fā)送數(shù)據(jù)...

第2次開始發(fā)送數(shù)據(jù)...

prepare:test

Bolt第1接受的消息:這是個測試消息!

Bolt第2接受的消息:這是個測試消息!

資源釋放

關(guān)閉...

到這里,是不是基本上對Storm的運作有些了解了呢。

這個demo達到了上述的三種模式圖中的第一種,一個Spout傳輸數(shù)據(jù),一個Bolt處理

雌。

那么如果我們想達到第二種模式呢,那又該如何做呢?

假如我們想統(tǒng)計下在一段文本中的單詞出現(xiàn)頻率的話,我們只需執(zhí)行一下步驟就可以了。

.首先將中的消息進行更改為數(shù)組,并依次將消息發(fā)送到

1SpoutmessageTestBolto

2.然后TestBolt將獲取的數(shù)據(jù)進行分割,將分割的數(shù)據(jù)發(fā)送到TestBolt2。

3,TestBolt2對數(shù)據(jù)進行統(tǒng)計,在程序關(guān)閉的時候進行打印。

4.Topology成功配置并且啟動之后,等待20秒左右,關(guān)閉程序,然后得到輸出的結(jié)果。

代碼示例如下:

Spout

用于發(fā)送消息。

importjava.util.Map;

importorg.apache,storm,spout.SpoutOutputCollector;

importorg.apache,storm,task.TopologyContext;

importorg.apache,storm,topology.OutputFie1dsDec1arer;

importorg.apache,storm,topology,base.BaseRichSpout;

importorg.apache,storm,tuple.Fields;

importorg.apache,storm,tuple.Values;

/**

*

*Title:TestSpout

*Description:

*發(fā)送信息

*Version:1.0.0

*?authorpancm

*?date2018年3月6日

*/

publicclassTestSpoutextendsBaseRichSpout{

privatestaticfinallongserialVersionUID=

225243592780939490L;

privateSpoutOutputCollectorcollector;

privatestaticfinalStringfield二〃word”;

privateintcount=l;

privateStringf]message={

“Mynicknameisxuwujing",

“Myblogaddressishttp://www.panchengming.com//7,

“Myinterestisplayinggames”

};

/**

*open。方法中是在ISpout接口中定義,在Spout組件初始化時被調(diào)

用。

*有三個參數(shù):

*1.Storm配置的Map;

*2.topology中組件的信息;

*3.發(fā)射tuple的方法;

*/

?Override

publicvoidopen(Mapmap,TopologyContextargl,

SpoutOutputCollectorcollector){

System,out.printIn(^open:z,+map.get("test"));

this,collector=collector;

)

/**

*ncxtTupleO方法是Spout實現(xiàn)的核心。

*也就是主要執(zhí)行方法,用于輸出信息,通過collector.emit方法發(fā)射。

*/

?Override

publicvoidnextTupleO{

if(count<=message.length){

System,out.printin("第〃+count+”次開始發(fā)送數(shù)

據(jù)…〃);

this,collector,emit(newValues(message[count-

1]));

count++;

/**

*declarcOutputFields是在[Component接口中定義,用于聲明數(shù)據(jù)格

式。

*即輸出的一個Tuple中,包含幾個字段。

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論