老師hadoop八天完全攻克視頻教程-weekend110-第7天_第1頁
老師hadoop八天完全攻克視頻教程-weekend110-第7天_第2頁
老師hadoop八天完全攻克視頻教程-weekend110-第7天_第3頁
老師hadoop八天完全攻克視頻教程-weekend110-第7天_第4頁
老師hadoop八天完全攻克視頻教程-weekend110-第7天_第5頁
已閱讀5頁,還剩18頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

,Storm0.7.0實現(xiàn)了一個新特性——事務(wù)性拓撲,這一特性使消息在語義上確保你可以安全,NOTE:Stormspoutbolt之上的抽象概念。omputolt(oltoltNT:su——處理階段提交階段om事務(wù)。NOTE:Storm使用zookeeper事務(wù)元數(shù)據(jù),默認情況下就是拓撲使用的那個zookeeper。你可以修改以下兩個配置參數(shù)鍵指定其它的zookeeper——transactional.zookeeper.serverstransactional.zookeeper.port。下面要創(chuàng)建一個分析工具來了解事務(wù)的工作方式。從一個Redis數(shù)據(jù)庫讀tweetsbolt處理它們,最后把結(jié)果保存在另一個Redis數(shù)據(jù)庫的列表中。處8-1。8-1正如你看到的,TweetsTransactionalSpouttweet數(shù)據(jù)庫并向拓撲分發(fā)批次。UserSplitterBoltHashTagSplitterBoltboltspoutUserSplitterBolttweets并查找用戶——以@開頭的單詞——users的自定義數(shù)據(jù)流組。HashtagSplitterBolttweet查找#開頭的單詞,并把它們分發(fā)到名為hashtagsbolt,UserHashtagJoinBolt,接收前面提到的兩個tweet內(nèi)的話題數(shù)量。為了計數(shù)并分發(fā)計算結(jié)果,這是個最后一個bolt—— 計數(shù),并在對一個批次完成處理時,把所有結(jié)果保存到redis。這是一種特殊的bolt,叫做提交者,在本章后面做講解。TransactionalTopologyBuildernewTransactionalTopologyBuilder("test","spout",newbuilder.setBolt("users-splitter",newUserSplitterBolt(), buildeer.setBolt("hashtag-splitter",newHashtagSplitterBolt(), builder.setBolt("users-hashtag-manager",newUserHashtagJoinBolt(), ("users-splitter","users",new ("hashtag-splitter","hashtags",new miter", ("users-splitter", ("hashtag-splitter", ("user-hashtag-publicclassTweetsTransactionalSpout正如你在這個類定義中看到的,TweetsTransactionalSpout繼承了帶范型的publicclassTransactionMetadataimplementsSerializable{privatestaticfinallongserialVersionUID=1L;longfrom; publicTransactionMetadata(longfrom,int ty){this.from=from; ty }}publicITransactionalSpout.Coordinator<TransactionMetadata>Mapconf,TopologyContextcontext)returnnew}etadata>getEmitter(Mapconf,TopologyContextcontest){returnnew}publicvoiddeclareOutputFields(OuputFieldsDeclarerdeclarer){declarer.declare(newFields("txid","tweet_id","tweet"));}getCoordinator方法,告訴Storm用來協(xié)調(diào)生成批次的類。getEmitter,負責批次并RQpublicclassRQpublicstaticfinalStringNEXT_READ="NEXT_READ";publicstaticfinalStringNEXT_WRITE="NEXT_WRITE";JedispublicRQ()jedis=new}publiclonggetavailableToRead(longcurrent){returngetNextWrite()-current;}publiclonggetNextRead()StringsNextRead=jedis.get(NEXT_READ); extRead==null){return}return }publiclonggetNextWrite()return}publicvoidclose(){}publicvoidsetNextRead(longnextRead){jedis.set(NEXT_READ,""+nextRead);}publicList<String>getMessages(longfrom,int ty){String[]keys=newString[ for(inti=0;i< ty;i++){keys[i]=""+(i+from);}return}}publicstaticclassTweetsTransactionalSpoutCoordinatorITransactionalSpout.Coordinator<TransactionMetadata>{TransactionMetadatalastTransactionMetadata;RQrq=newRQ();longnextRead=publicTweetsTransactionalSpoutCoordinator(){nextRead=rq.getNextRead();}publicTransactionMetadatainitializeTransaction(BigIntegertxid,TransactionMetadataprevMetadata){longty=ty=ty>MAX_TRANSACTION_SIZEMAX_TRANSACTION_SIZE:TransactionMetadataret=newTransactionMetadata(nextRead,nextRead+=return}publicbooleanisReady()returnrq.getAvailableToRead(nextRead)>}publicvoidclose(){}}值得一提的是,在整個拓撲中只會有一個提交者實例。創(chuàng)建提交者實例時,它會從redis讀取一個從1開始的序列號,這個序列號標識要的tweet下一條。此方法應(yīng)當相應(yīng)的返回true或false。在此例中,tweets數(shù)量并與已讀數(shù)量比較。它tweets0tweets未讀。tweets可讀。只要確認了這一點,就創(chuàng)建一個取的tweets數(shù)量(譯者注:對象屬性ty)。元數(shù)據(jù)對象一經(jīng)返回,Stormtxidzookeeper。這樣就確保了一旦發(fā)生故障,Storm可以利用分發(fā)器(譯者注:Emitter,見下文)重新發(fā)送批次。publicstaticclassimplementsITransactionalSpout.Emitter<TransactionMetadata> RQrq=new publicTweetsTransactionalSpoutEmitter() publicvoidemitBatch(TransactionAttempttx,TransactionMetadatacoordinatorMeta,BatchOutputCollectorcollector){List<String>messages=rq.getMessages(coordinatorMeta.from,<spanstyle="font-family: ia,'TimesNewRoman','BitstreamCharter',Times,serif;font-size:13px;line-height:19px;">coordinatorMeta. eetId=coordinatorMeta.from;for(Stringmessage:messages){collector.emit(newValues(tx,""+tweetId,message));}}publicvoidcleanupBefore(BigIntegertxid)publicvoidclose(){}分發(fā)器從數(shù)據(jù)源數(shù)據(jù)并從數(shù)據(jù)流組發(fā)送數(shù)據(jù)。分發(fā)器應(yīng)當問題能夠為相同的事務(wù)id和id和事務(wù)元數(shù)據(jù),并確保批次已經(jīng)重復(fù)過了。Storm會在TransactionAttempt對象里為嘗試次數(shù)增加計數(shù)(譯者注:attemptid)。這樣就能知道在這里emitBatchredis得到tweets,同時增加redistweetstweets分發(fā)到拓撲。publicclassUserSplitterBoltimplementsIBasicBolt{privatestaticfinallongserialVersionUID=publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declareStream("users",new}publicMap<String,Object> ponentConfiguration(){returnnull;}publicvoidprepare(MapstormConf,TopologyContextcontext)publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){ eet=input.getStringByField("tweet"); eetId=input.getStringByField("tweet_id");StringTokenizerstrTok=newStringTokenizer(tweet,"");HashSet<String>users=newHashSet<String>();while(strTok.hasMoreTokens()){Stringuser=strTok.nextToken();tweetif(user.startsWith("@")&&!users.contains(user)){}}}publicvoid}publicclassHashtagSplitterBoltimplementsIBasicBolt{privatestaticfinallongserialVersionUID=1L;publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declareStream("hashtags",new}publicMap<String,Object> ponentConfiguration(){returnnull;}publicvoidprepare(MapstormConf,TopologyContextcontext)publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){ eet=input.getStringByField("tweet"); eetId=StringTokenizerstrTok=newStringTokenizer(tweet,"");TransactionAttempttx=HashSet<String>words=newHashSet<String>();while(strTok.hasMoreTokens()){Stringword=strTok.nextToken();if(word.startsWith("#")&&!words.contains(word)){collector.emit("hashtags",newValues(tx,tweetId,}}}publicvoid}UserHashTagJoinBoltBaseBatchBolt。這finishBatch方法。publicvoidexecute(Tupletuple)Stringsource= eetId=if("hashtags".equals(source))Stringhashtag=tuple.getStringByField("hashtag");add(tweetHashtags,tweetId,hashtag);}elseif("users".equals(source))Stringuser=tuple.getStringByField("user");add(userTweets,user,tweetId);}}finishBatch方法。publicvoidfinishBatch()for(Stringuser:userTweets.keySet()){Set<String>tweets=getUserTweets(user);HashMap<String,Integer>hashtagsCounter=newHashMap<String, Set<String>hashtags=getTweetHashtags(tweet);for(StringIntegercount=hashtagsCounter.get(hashtag);}}}for(Stringhashtag:hashtagsCounter.keySet()){intcount=hashtagsCounter.get(hashtag);collector.emit(new}}}你可以在上找到并完整代碼。(譯者注/storm-book/examples-ch08-transactional-topologies這個倉庫里沒有代協(xié)調(diào)者bolts是一類特殊的批處理bolts,它們實現(xiàn)了IComhmitter或者通過TransactionalTopologyBuilder調(diào)用 miterBolt設(shè)置了提交者bolt。它們與其它的批處理bolts最大的不同在于,提交者bolts的finishBatch方法在提交就緒時執(zhí)行。這如果同時有事務(wù)ID1和事務(wù)ID2兩個事務(wù)同時執(zhí)行,只有在ID1沒有任何差錯的執(zhí)行了finishBatch方法之后,ID2才會執(zhí)行該方法。public miterBoltextendsimplementsICommitterpublicstaticfinalString MITED_TRANSACTION_FIELD= TransactionAttemptid;BatchOutputCollectorcollector;Jedisjedis;publicvoidprepare(Mapconf,TopologyContextBatchOutputCollectorTransactionAttemptid)this.id=id;this.collector=collector;this.jedis=new}HashMap<String,Long>hashtags=newHashMap<String,Long>();HashMap<String,Long>users=newHashMap<String,Long>();HashMap<String,Long>usersHashtags=newHashMap<String,privatevoidcount(HashMap<String,Long>map,Stringkey,int{Longvalue=if(value==null){value=(long)0;}value+=count;}publicvoidexecute(Tupletuple)Stringorigin=tuple. if("sers-splitter".equals(origin)){Stringuser=tuple.getStringByField("user");count(users,user,1);}elseif("hashtag-splitter".equals(origin))Stringhashtag=tuple.getStringByField("hashtag");count(hashtags,hashtag,1);}elseif("user-hashtag-merger".quals(origin)){Stringhashtag=tuple.getStringByField("hashtag");Stringuser=tuple.getStringByField("user");Stringkey=user+":"+hashtag;Integercount=tuple.getIntegerByField("count");count(usersHashtags,key,count);}}publicvoidfinishBatch() mitedTransaction= StringcurrentTransaction= Transactionmulti= Set<String>keys=hashtags.keySet();for(Stringhashtag:keys){Longcount=hashtags.get(hashtag);multi.hincrBy("hashtags",hashtag,count);}keys=for(Stringuser:keys)Longcount=users.get(user);}keys=usersHashtags.keySet();for(Stringkey:keys){Longcount=usersHashtags.get(key);multi.hincrBy("users_hashtags",key,count);}}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)} MITED_TRANSACTION_FIELD,ID。為什么要這樣做?記住,如果事務(wù)失敗了,Storm將會盡可能多的重復(fù)必要的次數(shù)。如果你不確定已經(jīng)處理了這個事務(wù),你就會多算,ID,并在提交前檢查。redistweetsredis數(shù)據(jù)庫里。通過實現(xiàn)下面修改TweetsTransactionalSpout,使它可以處理數(shù)據(jù)分區(qū)。BasePartitionedTransactionalSpout,它實現(xiàn)了publicclassTweetsPartitionedTransactionalSpoutextendsBasePartitionedTransactionalSpout<TransactionMetadata>{}publicstaticclassimplementsCoordinator{publicintnumPartitions(){return4;}publicbooleanisReady()return}publicvoidclose()}在這個例子里,協(xié)調(diào)器很簡單。numPartitionsStorm一共有多少分區(qū)。而且你IPartitionedTransactionalSpout,元數(shù)據(jù)由分發(fā)器publicstaticclassTweetsPartitionedTransactionalEmitterimplementsEmitter<TransactionMetadata>{PartitionedRQrq= publicTransactionMetadataemitPartitionBatchNew(TransactionAttempttx,BatchOutputCollectorcollector,intpartition,TransactionMetadatalastPartitioonMeta){longif(lastPartitionMeta==null)nextRead= nextRead=lastPartitionMeta.from+ artition,nextRead);} ty=rq.getAvailableToRe artition,nextRead);ty= ty>MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE TransactionMetadatametadata=newTransactionMetadata(nextRead,(int) emitPartitionBatch(tx,collector,partition,metadata);returnmetadata;}publicvoidemitPartitionBatch(TransactionAttempttx,BatchOutputCollectorcollector,intpartition,TransactionMetadatapartitionMeta) ty<=0){}List<String>messages=rq.getMessages(partition, eetId=partitionMeta.from;for(Stringmsg:messages){collector.emit(newValues(tx,""+tweetId,msg));}}publicvoidclose()}這里有兩個重要的方法,emitPartitionBatchNewemitPartitionBatch。對于zookeeper。StormID,表示一個事務(wù)貫穿了所有數(shù)據(jù)分區(qū)。通過emitPartitionBatch利用保存下來的元數(shù)據(jù)重復(fù)這個批次。NOTEStormID得到不同的元組,你就tweets5ID321,這時你多數(shù)了8個。你要以下三個值——previousCount=5,currentCount=13,以及l(fā)astTransactionId=321ID3214個元組,而不是8IDpreviousCount54currentCount9。publicstaticTweetsOpaquePartitionedTransactionalSpoutCoordinatorIOpaquePartitionedTransactionalSpout.Coordinato

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論