版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
修訂記錄課程編碼適用產(chǎn)品產(chǎn)品版本課程版本ISSUEModule14Streaming應用開發(fā)FusionInsightFusionInsightHDV100R002C60V1.0開發(fā)/優(yōu)化者時間審核人開發(fā)類型(新開發(fā)/優(yōu)化)本頁不打印FusionInsightHD
Streaming應用開發(fā)目標學完本課程后,您將能夠:了解Storm基本業(yè)務(wù)開發(fā)流程熟悉Storm常用API接口使用熟悉Storm業(yè)務(wù)設(shè)計基本原則了解Streaming二次開發(fā)環(huán)境了解CQL開發(fā)流程及使用目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例Streaming的定義Streaming基于開源Storm,是一個分布式、實時計算框架。事件驅(qū)動連續(xù)查詢數(shù)據(jù)不存儲,先計算實時響應,低延遲EventDataQueriesAlertsActionsNowaiting;Resultsdeliveredin-flightQueriesMemoryDisk傳統(tǒng)數(shù)據(jù)庫技術(shù)
數(shù)據(jù)先存儲,再查詢處理
Hadoop技術(shù)
數(shù)據(jù)先寫入文件系統(tǒng)進行批處理Streaming架構(gòu)回顧Client提交拓撲SupervisorWorkerSupervisorNimbusWorkerWorkerExecutorExecutor下載Jar包啟動WorkerZookeeperZookeeperZookeeper監(jiān)控心跳,分配任務(wù)獲取分配任務(wù)上報心跳Streaming的適用場景Streaming主要應用于以下幾種對響應時延有嚴格要求的場景:
實時分析:如實時日志處理、交通流量分析等實時統(tǒng)計:如網(wǎng)站的實時訪問統(tǒng)計、排序等實時推薦:如實時廣告定位、事件營銷等目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例2Streaming應用開發(fā)流程準備開發(fā)環(huán)境下載并導入樣例工程根據(jù)場景開發(fā)拓撲打包Eclipse代碼打包完整業(yè)務(wù)提交拓撲查看運行結(jié)果制定業(yè)務(wù)目標2Streaming應用開發(fā)流程制定業(yè)務(wù)目標數(shù)據(jù)源?Spout從哪里獲取數(shù)據(jù),例如Kafka、TCP或者MQ等。結(jié)果輸出到哪里?寫入Kafka、HDFS或者Redis等。拓撲結(jié)構(gòu)設(shè)計。Spout、Bolt如何組織??煽啃砸??是否帶Acker?2Streaming應用開發(fā)流程準備開發(fā)環(huán)境準備項說明操作系統(tǒng)Windows系統(tǒng),推薦Windows7以上版本。安裝JDK開發(fā)環(huán)境的基本配置。版本要求:1.7或者1.8。安裝和配置Eclipse用于開發(fā)Streaming拓撲的工具。網(wǎng)絡(luò)確??蛻舳伺cStreaming服務(wù)主機在網(wǎng)絡(luò)上互通。2Streaming應用開發(fā)流程下載并導入Streaming樣例工程1、下載并解壓Streaming客戶端壓縮包2、在FusionInsightManager頁面新建用戶,用于登陸與操作3、下載用戶的認證憑據(jù)文件4、導入客戶端中“storm-examples”樣例工程到Eclipse開發(fā)環(huán)境5、配置認證憑據(jù)文件到樣例工程的“src/main/resources”下2Streaming應用開發(fā)流程2Streaming應用開發(fā)流程根據(jù)場景開發(fā)拓撲梳理業(yè)務(wù)場景根據(jù)功能實現(xiàn)Spout/Bolt熟悉Storm的API,調(diào)用相應的API構(gòu)造拓撲業(yè)務(wù)代碼開發(fā)完成后,參考樣例代碼WordCountTopology.java選擇提交方式,如Local、Remote和CMD等。2Streaming應用開發(fā)流程打包Eclipse代碼業(yè)務(wù)開發(fā)完成后,在開發(fā)環(huán)境Eclipse工程上,右擊選擇“Export”,在“Export”面板中選擇“JarFile”,將工程中的“src/main/java”源碼打包到指定路徑,如“D:\\tmp\\example.jar”。2Streaming應用開發(fā)流程打包完整業(yè)務(wù)將Eclipse打出的example.jar和業(yè)務(wù)中引入的外部jar包和相關(guān)配置文件整合到同一目錄下,并打出最終的業(yè)務(wù)jar包在Eclipse工程的tools目錄下找到打包工具:“streaming-jartool.bat”來完成打包2Streaming應用開發(fā)流程提交拓撲當前streaming支持三種方式提交拓撲Linux命令行提交—CMD模式:將打出的業(yè)務(wù)jar包上傳至已安裝streaming客戶端的linux環(huán)境上,使用stormjar<jarFile><className><topoName>命令提交拓撲。在提交之前請使用已申請的安全用戶執(zhí)行kinit<userName>進行安全認證。Eclipse遠程提交——Remote模式:在提交前需要在本地進行安全準備,適配代碼中的業(yè)務(wù)jar包路徑和安全參數(shù),然后右鍵->單擊“Runas>JavaApplication”提交拓撲。本地模式提交——Local模式:該模式下需要拷貝業(yè)務(wù)所需的外部jar包及配置問價到本地工程并且加入到classpath中,然后右鍵->單擊“Runas>JavaApplication”提交拓撲。本地模式一般用來測試。2Streaming應用開發(fā)流程查看運行結(jié)果登錄FusionInsightManager系統(tǒng),選擇“服務(wù)管理>Streaming”,點擊進入StreamingWebUI,在StormUI中點擊應用名稱,查看應用程序運行情況。目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例3CQL應用開發(fā)流程Shell提交下載并安裝Streaming客戶端到Linux客戶端主機在FusionInsightManager頁面新建用戶,用于登陸與操作使用新建的用戶執(zhí)行kinit進行安全登錄進入安裝好的Streaming客戶端“streaming-cql-1.0/bin”目錄,執(zhí)行“cql”進入CQLShell在Shell中使用CQL語句定義并提交拓撲3CQL應用開發(fā)流程3CQL應用開發(fā)流程Eclipse遠程提交下載并安裝Streaming客戶端在FusionInsightManager頁面新建用戶,用于登陸與操作下載用戶的憑據(jù)文件導入客戶端中“cql-examples”樣例工程到Eclipse開發(fā)環(huán)境配置認證憑據(jù)文件到樣例工程的“src/main/resources”下在本地開發(fā)CQL應用并保存到“src/main/resources”下,如“example.cql”適配CQLExample.java中的安全參數(shù)和待執(zhí)行CQL文件地址右鍵->RunAs->JavaApplication執(zhí)行CQLExample.java目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-業(yè)務(wù)目標datadata…TCPClientStormClusterTop1張三10次Top2李四9次Top3王五7次...Top10XX0次統(tǒng)計最近10分鐘內(nèi)訪問游客的Top10,每10秒輸出一次統(tǒng)計結(jié)果4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-Spout設(shè)計功能要求收據(jù)接收啟動TCPServer數(shù)據(jù)反序列化需要對接收的數(shù)據(jù)進行反序列化處理數(shù)據(jù)拆分將反序列化后的數(shù)據(jù)根據(jù)預先設(shè)計好的schema進行拆分數(shù)據(jù)篩選對拆分后的數(shù)據(jù)進行篩選,篩選出關(guān)鍵信息數(shù)據(jù)緩存將提取的關(guān)鍵數(shù)(username)據(jù)緩存到固定大小的隊列中數(shù)據(jù)發(fā)送從緩存中取出數(shù)據(jù)發(fā)送4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-TimerSpout設(shè)計功能要求發(fā)送時間戳每10秒發(fā)送一次時間戳4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-CountingBolt設(shè)計功能要求窗口定義維護一個10min的窗口,窗口中緩存<username,count>,且只保留count值Top10數(shù)據(jù)刷新窗口判斷當前接收到的數(shù)據(jù)是不是時間戳,如果不是則刷新窗口中的數(shù)據(jù)并重新計算排序發(fā)送數(shù)據(jù)判斷當前接收到的數(shù)據(jù)是不是時間戳,如果是則將當前窗口內(nèi)容發(fā)送給下一跳4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-GatherBolt設(shè)計功能要求數(shù)據(jù)匯聚將接收到的同一批次的數(shù)據(jù)匯總,根據(jù)時間戳頻率,每10秒完成一次匯聚結(jié)果處理將匯聚后的數(shù)據(jù)重新排序并保留Top10數(shù)據(jù)存儲將處理后的最終Top10結(jié)果存入Kakfa/Redis/DB等4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-拓撲設(shè)計SpoutTimerSpoutCountBoltCountBoltCountBoltGatherBolt4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-分組方式設(shè)計分組方式功能介紹fieldsGrouping(字段分組)按照消息的哈希值分組發(fā)送給目標Bolt的TaskglobalGrouping(全局分組)所有消息都發(fā)送給目標Bolt的固定一個TaskshuffleGrouping(隨機分組)消息發(fā)送給目標Bolt的隨機一個tasklocalOrShuffleGrouping(本地或者隨機分組)如果目標Bolt在同一工作進程存在一個或多個Task,數(shù)據(jù)會隨機分配給這些Task。否則,該分組方式與隨機分組方式相同allGrouping(廣播分組)消息群發(fā)給目標Bolt的所有TaskdirectGrouping(直接分組)由數(shù)據(jù)生產(chǎn)者決定數(shù)據(jù)發(fā)送給目標Bolt的哪一個Task。需在發(fā)送時使用emitDirect(taskID,tuple)接口指定TaskIDpartialKeyGrouping(局部字段分組)更均衡的字段分組noneGrouping(不分組)當前和隨機分組相同4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-分組方式設(shè)計SpoutCountBoltCountBoltCountBolt張三,張三,王五張三,李四張三,王五GatherBolt張三2王五1張三1李四1張三1王五1張三2張三1張三1李四1王五1王五1Spout和CountBolt之間采用隨機分組方式結(jié)果錯誤!4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-分組方式設(shè)計SpoutCountBoltCountBoltCountBolt張三,張三,張三李四,李四王五,王五GatherBolt張三3李四2王五2張三3李四2王五2Spout和CountBolt之間采用字段分組方式,以username為關(guān)鍵字結(jié)果正確!4應用開發(fā)案例分析固定時間窗口內(nèi)TopN統(tǒng)計-分組方式設(shè)計TimerSpoutCountBoltCountBoltCountBoltGatherBoltallGroupingglobalGrouping需要根據(jù)場景選擇合適的分組方式,才能獲得預期的結(jié)果目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例5常用接口示例Storm提供接口:REST接口REST(RepresentationalStateTransfer)表述性狀態(tài)轉(zhuǎn)移接口Thrift接口由Nimbus提供。Thrift是一個基于靜態(tài)代碼生成的跨語言的RPC協(xié)議棧實現(xiàn),它可以生成包括C++,Java,Python,Ruby,PHP等主流語言的代碼,這些代碼實現(xiàn)了RPC的協(xié)議層和傳輸層功能,從而讓用戶可以集中精力于服務(wù)的調(diào)用和實現(xiàn)5常用接口示例-Spout接口5常用接口示例-Spout接口publicclassRandomSentenceSpoutextendsBaseRichSpout{SpoutOutputCollector_collector;Random_rand;
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){_collector=collector;_rand=newRandom();}
publicvoidnextTuple(){String[]sentences=newString[]{"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway"};Stringsentence=sentences[_rand.nextInt(sentences.length)];_collector.emit(newValues(sentence));}publicvoidack(Objectid){}
publicvoidfail(Objectid){}
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}}open方法(Spout中第一個被調(diào)用的方法)中提供一個SpoutOutputcollector用來發(fā)射tuplenextTuple方法中從指定字符串列表中隨機選擇一個字符串,并發(fā)送出去,該方法被周期性調(diào)用declareOutputFields定義一個叫做”word”的字段的tupleStorm在檢測到一個Tuple被整個Topology成功處理的時候調(diào)用ack,否則調(diào)用fail5常用接口示例-Spout接口Spout的Ack開關(guān)不啟用Ack:
在nextTuple()方法中使用不帶ack的接口發(fā)送消息: collector.emit(newValues(sentence))啟用Ack:構(gòu)造一個全局唯一的meaasgeId如:StringmeaasgeId=UUID.randomUUID().toString();在nextTuple()方法中使用帶ack的接口發(fā)送消息:collector.emit(newValues(sentence),messageID);5常用接口示例-Bolt接口5常用接口示例-Bolt接口classSplitSentenceimplementsIRichBolt{privateOutputCollectorcollector;publicvoidprepare(Mapconf,TopologyContextcontext,OutputCollectorcollector){this.collector=collector;}publicvoidexecute(Tupletuple){Stringsentence=tuple.getString(0);for(Stringword:sentence.split("")){collector.emit(newValues(word));}}publicvoidcleanup(){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}}prepare方法在bolt開始處理消息之前調(diào)用,提供一個Outputcollector用來發(fā)射tupleexecute方法中從bolt的一個輸入接收tuple.并獲取tuple的第一個字段,即字符串內(nèi)容,以空格拆分,并發(fā)送出去declareOutputFields定義一個tuple,包含有”word”字段cleanup在bolt被銷毀的時候調(diào)用5常用接口示例-Bolt接口Bolt的Ack開關(guān)不啟用Ack:
在execute(Tupletuple)方法中使用如下的接口發(fā)送消息: collector.emit(newValues(sentence))啟用Ack:方法1:在execute(Tupletuple)方法中使用如下接口發(fā)送消息,錨定上一條tuple: collector.emit(tuple,newValues(sentence));在emit執(zhí)行成功后執(zhí)行collector.ack()方法,或者emit執(zhí)行失敗后執(zhí)行collector.fail()方法,向Acker應答此消息的發(fā)送結(jié)果。方法2:Bolt繼承BaseBasicBolt,該父類會自動錨定tuple并且自動應答,客戶端只需要調(diào)用collector.emit(newValues(sentence))接口發(fā)送消息就可以了。5常用接口示例-創(chuàng)建拓撲publicstaticvoidmain(String[]args)throwsException{TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout("spout",newRandomSentenceSpout(),5);builder.setBolt("split",newSplitSentence(),8).shuffleGrouping("spout");builder.setBolt("count",newWordCount(),12).fieldsGrouping("split",newFields("word"));Configconf=newConfig();conf.setDebug(true);conf.setNumWorkers(3);conf.setNumAckers(1);StormSubmitter.submitTopologyWithProgressBar(args[0],conf,builder.createTopology());}使用TopologyBuilder容器來定義拓撲。setSpout()方法設(shè)置Spout和其并發(fā)度。setBolt()方法設(shè)置Bolt,并在setBolt()方法的返回值上設(shè)置分組方式。初始化Config對象,并且設(shè)置客戶端參數(shù)。storm提供了大量客戶端參數(shù)供用戶設(shè)置,比如:TOPOLOGY_WORKERS(setNumWorkers)用來設(shè)置拓撲的worker數(shù)量。 TOPOLOGY_ACKER_EXECUTORS(setNumAckers)用來設(shè)置Acker的數(shù)量,設(shè)置為0表示不開啟Acker。StormSubmitter.submitTopology(topology-name,conf,topology)接口來提交拓撲。目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例6CQL語法示例-創(chuàng)建輸入流CREATEINPUTSTREAMexample(eventIdINT,eventDescSTRING)COMMENT"thisisaexampleofcreateinputstream."SERDESimpleSerDePROPERTIES(separator="|")SOURCETCPClientInputPROPERTIES(server="",port="9999")PARALLEL2;6CQL語法示例-JoinINSERT
INTOSTREAMrsSELECT*FROMS1[RANGE20SECONDSBATCH]JOINS2[RANGEUNBOUNDED]ONs1.id=s2.idWHEREs1.id>5;INSERT
INTOSTREAMrsSELECT*FROMS1[ROWS10SLIDE]LEFT
JOINS2[rangetodayts]ONs1.id=s2.id;6CQL語法示例-窗口語法名稱說明S[ROWSN1BATCH]長度跳動窗窗口內(nèi)最大保存N1個事件,當有新事件產(chǎn)生的時候,窗口內(nèi)每攢滿N1個事件,就過期依次。同時過期的所有事件處于同一批次。S[RANGET1SLIDE]時間滑動窗窗口內(nèi)保存最近T1時間范圍內(nèi)的數(shù)據(jù),T1是一個時間單位,可以加入Seconds等時間單位。窗口內(nèi)的事件依次過期。每個過期事件的批次都不同。S[ROWSN1SLIDEPARTITIONBYEXP1]分組長度滑動窗同長度滑動窗,但是加入了分組的概念,事件歸屬于不同的分組,每個分組的長度為N1,逐個過期。S[RANGET1SLIDETRIGGERBYEXP1]事件驅(qū)動時間滑動窗窗口內(nèi)保存最近T1時間單位的數(shù)據(jù),exp1是一個返回值為時間類型的表達式,每次產(chǎn)生數(shù)據(jù)之后,都會和窗口內(nèi)的數(shù)據(jù)做對比,然后將大于T1時間單位的數(shù)據(jù)吐出,每次只吐出一個數(shù)據(jù)?!?CQL語法示例-窗口--按照type對窗口內(nèi)數(shù)據(jù)進行分組,每組容量為10SELECT*FROMtransformEvent[ROWS10SLIDEPARTITIONBY
TYPE];--時間排序窗,一般用來解決數(shù)據(jù)亂序問題SELECT*FROMtransformEvent[RANGE1000MILLISECONDSSORTBYdte];--時間驅(qū)動滑動窗INSERT
INTOSTREAMrssum(OrderPrice),avg(OrderPrice),count(OrderPrice)
FROMtransformEvent[RANGE10SECONDSSLIDETRIGGERbyTSEXCLUDEnow];--保存周期為一個自然天的分組窗INSERT
INTO
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年藝術(shù)品私人展覽策劃與運營合同3篇
- 2025年度個人門面房出租合同(含家具贈送及經(jīng)營指導服務(wù)協(xié)議)3篇
- 2025年旅游服務(wù)售后保障及投訴處理協(xié)議3篇
- 二零二五年度集資房購房合同解除及終止協(xié)議3篇
- 2025年度個人股權(quán)激勵方案設(shè)計與轉(zhuǎn)讓合同3篇
- 2025年校車租賃與駕駛員健康管理合同3篇
- 陽臺土豆打頂施工方案
- 2025年度個人教育培訓貸款合同及課程安排4篇
- 鉆井工程課程設(shè)計英文
- 2024年學校人事檔案管理制度
- 割接方案的要點、難點及采取的相應措施
- (一模)株洲市2025屆高三教學質(zhì)量統(tǒng)一檢測 英語試卷
- DB11∕T 1028-2021 民用建筑節(jié)能門窗工程技術(shù)標準
- (初級)航空油料計量統(tǒng)計員技能鑒定理論考試題庫(含答案)
- 執(zhí)業(yè)藥師勞動合同范本
- 2024年高考英語復習(新高考專用)完形填空之詞匯復現(xiàn)
- 【京東物流配送模式探析及發(fā)展對策探究開題報告文獻綜述4100字】
- 施工現(xiàn)場工程令
- 藥物經(jīng)濟學評價模型構(gòu)建
- Daniel-Defoe-Robinson-Crusoe-笛福和魯濱遜漂流記全英文PPT
- 第一章威爾遜公共行政管理理論
評論
0/150
提交評論