版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
模塊七大數(shù)據(jù)日志分析綜合項(xiàng)目案例7.1項(xiàng)目目的當(dāng)今時(shí)代,數(shù)據(jù)與我們息息相關(guān),我們每天都會(huì)接觸到各種軟件、瀏覽各種網(wǎng)站,在生活上,也不僅僅是接觸信息,其實(shí)也是在生產(chǎn)很多信息,留下很多的數(shù)據(jù),只是人們可能沒有察覺而已。本模塊將會(huì)總結(jié)前面模塊所學(xué)習(xí)的大部分組件,設(shè)計(jì)成了綜合項(xiàng)目案例,讓大家對大數(shù)據(jù)的認(rèn)識提到一個(gè)新的高度,并且熟悉生產(chǎn)上的開發(fā)流程。7.2項(xiàng)目意義本項(xiàng)目案例的日志指的是用戶行為日志,用戶行為日志可以類比于網(wǎng)站或者app的眼睛,開發(fā)人員可以從中了解到用戶的主要來源、喜歡的內(nèi)容、用戶的訪問設(shè)備等等。也可以將此比喻為網(wǎng)站或者app的神經(jīng),通過用戶行為日志的分析,可以清楚網(wǎng)站或者app的優(yōu)缺點(diǎn),了解用戶使用過程中遇到的各種問題以及反饋,進(jìn)而有利于優(yōu)化自己的網(wǎng)站或者app,提升用戶的體驗(yàn)。此外,通過日志分析,還可以通過用戶的行為日志,挖掘出有價(jià)值的信息,將信息進(jìn)行歸類,劃分主要的傾向人群,有利于實(shí)現(xiàn)業(yè)務(wù)需求。7.3項(xiàng)目背景此處用戶每次訪問網(wǎng)站或者app時(shí),都會(huì)留下很多的行為數(shù)據(jù),這些行為包括訪問、瀏覽、搜索、點(diǎn)擊等等。每一個(gè)行為動(dòng)作所產(chǎn)生的數(shù)據(jù)都可以被后臺采集到。比如說點(diǎn)擊的URL、從哪個(gè)URL跳轉(zhuǎn)過來的(referer)、頁面上的停留時(shí)間等等。當(dāng)有了數(shù)據(jù)之后,就可以進(jìn)行大數(shù)據(jù)的分析統(tǒng)計(jì)等等工作了。7.4項(xiàng)目架構(gòu)先來了解一下本次項(xiàng)目的架構(gòu),再來總結(jié)數(shù)據(jù)處理的流程,如圖7-1所示。圖7-1項(xiàng)目架構(gòu)流程圖當(dāng)訪問網(wǎng)站或者使用app的時(shí)候,都會(huì)產(chǎn)生許多日志信息,存放到日志服務(wù)器里面。框架圖中WebServer指的是網(wǎng)站或者app的后臺,而本實(shí)訓(xùn)將日志信息直接存放到服務(wù)器的/home/access.log路徑下,然后通過Flume對采集到的信息進(jìn)行路由,此處路由分兩條主線,一條是直接將數(shù)據(jù)采集到HDFS,讓MapReduce對HDFS上的數(shù)據(jù)進(jìn)行清洗或者離線分析,分析完后再將結(jié)果存放到傳統(tǒng)數(shù)據(jù)庫中,此處是使用MySQL。Flume路由的另一條主線是與Kafka整合,將消費(fèi)的數(shù)據(jù)存儲到由HBase中,當(dāng)然此處的Kafka也可以與SparkStreaming、Storm、Flink等組件整合,實(shí)現(xiàn)實(shí)時(shí)流處理主線。Kafka與HBase整合完后,HBase可以與傳統(tǒng)的業(yè)務(wù)系統(tǒng)整合,也可以與其他組件整合,如圖7-1中將HBase與Hive進(jìn)行整合,目的是實(shí)現(xiàn)通過類SQL對HBase中的數(shù)據(jù)進(jìn)行高效的分析。最后,這兩條主線可以與ECharts整合,對數(shù)據(jù)進(jìn)行可視化。1.數(shù)據(jù)處理流程綜上所述,可以將數(shù)據(jù)處理流程歸結(jié)為五大步驟:數(shù)據(jù)采集->數(shù)據(jù)清洗->數(shù)據(jù)分析->數(shù)據(jù)入庫->數(shù)據(jù)可視化1)數(shù)據(jù)采集可以使用Flume對數(shù)據(jù)進(jìn)行采集,將web日志寫入到HDFS、Kafka或者HBase等等中。2)數(shù)據(jù)清洗可以使用MapReduce、Spark、Hive、Flink或者其他的一些分布式計(jì)算框架,對數(shù)據(jù)進(jìn)行清洗,先過濾掉沒有意義的數(shù)據(jù),如臟數(shù)據(jù)或者與業(yè)務(wù)不相關(guān)的數(shù)據(jù)等等,清洗完之后的數(shù)據(jù)可以存放在HDFS或者Hive、SparkSQL等等中。3)數(shù)據(jù)處理按照需求對相應(yīng)業(yè)務(wù)進(jìn)行統(tǒng)計(jì)和分析,可以使用數(shù)據(jù)清洗時(shí)的計(jì)算框架。4)數(shù)據(jù)處理結(jié)果入庫處理的結(jié)果可以存放到RDBMS、NoSQL等數(shù)據(jù)庫中。5)數(shù)據(jù)可視化當(dāng)數(shù)據(jù)入庫之后,可以開發(fā)各種各樣的圖形化界面對分析結(jié)果進(jìn)行展示,比如說餅圖、柱狀圖、地圖、折線圖等等,可以借助的工具有ECharts、DataV、HUE、Zeppelin、Kibana等。7.5項(xiàng)目需求當(dāng)獲取到了數(shù)據(jù),可以從中挖掘出一些價(jià)值,想要挖掘什么價(jià)值,取決于業(yè)務(wù)能力水平,而能否實(shí)現(xiàn),則取決于技術(shù)本身的能力以及所擁有的數(shù)據(jù)維度有多廣有多完善,而實(shí)現(xiàn)的難度則與數(shù)據(jù)的質(zhì)量息息相關(guān)。本次項(xiàng)目的數(shù)據(jù)采用模擬的方式生成,自定義的數(shù)據(jù)有ip、時(shí)間、訪問的URL、跳轉(zhuǎn)過來的網(wǎng)址、狀態(tài)碼。主要有5個(gè)字段,當(dāng)然,此數(shù)據(jù)可以自行修改自行生成,也可以拿真實(shí)的數(shù)據(jù)來操作。基于數(shù)據(jù),可以實(shí)現(xiàn)的業(yè)務(wù)場景有非常多,自己可以嘗試去多挖掘。比如說,統(tǒng)計(jì)哪三個(gè)省份的用戶訪問網(wǎng)站最頻繁?統(tǒng)計(jì)訪問網(wǎng)站最頻繁的時(shí)間段是哪個(gè)?統(tǒng)計(jì)過去10個(gè)小時(shí)內(nèi),用戶的訪問量有多少?還有很多,都可以實(shí)現(xiàn)。為了更好地與前面模塊的內(nèi)容銜接,也為了降低學(xué)習(xí)的難度,本次項(xiàng)目的業(yè)務(wù)需求是統(tǒng)計(jì)每天的用戶訪問量。7.6業(yè)務(wù)實(shí)現(xiàn)1.準(zhǔn)備工作需要準(zhǔn)備好開發(fā)工具和所需要的軟件的安裝包,前面的實(shí)訓(xùn)已經(jīng)準(zhǔn)備好了。所以此處不再做過多說明。在實(shí)操的時(shí)候,應(yīng)確保各軟件的版本與本書一致,不一致也應(yīng)該確保大版本保持一致;如不相同,遇到問題,請先自行搜索與自己版本相關(guān)的解決方案。2.效果提前預(yù)覽 項(xiàng)目的最終效果如圖7-2所示。圖7-2項(xiàng)目展示效果圖說明:具體的次數(shù)每個(gè)人會(huì)不相同。3.實(shí)現(xiàn)步驟接下來將一步一步來實(shí)現(xiàn),主要分為以下七大步驟:步驟一、模擬日志生產(chǎn)步驟二、編寫Flume配置文件步驟三、Flume整合Kafka步驟四、Flume與HDFS、Kafka整合步驟五、Kafka與HBase整合步驟六、MapReduce分析HDFS上的數(shù)據(jù)并寫入到MySQL步驟七、ECharts與MySQL整合實(shí)現(xiàn)數(shù)據(jù)可視化1)模擬日志生產(chǎn)①新建一個(gè)名稱為logstat的項(xiàng)目,關(guān)鍵設(shè)置選項(xiàng)如圖7-3所示。圖7-3新建項(xiàng)目項(xiàng)目新建好后,界面如圖7-4所示。圖7-4界面總覽接著,在java目錄里面新建包c(diǎn)om.bigdata.hadoop.generate操作如圖7-5、圖7-6所示。圖7-5新建Package圖7-6給新建包命名新建GenerateLog類,里面編寫模擬日志生成的主程序。操作過程如圖7-7、圖7-8所示。圖7-7新建Class圖7-8給新建類命名②編寫代碼packagecom.bigdata.hadoop.generate;importjava.io.File;importjava.io.FileOutputStream;importjava.io.IOException;importjava.text.DateFormat;importjava.text.SimpleDateFormat;importjava.util.Calendar;importjava.util.Date;importjava.util.Random;importjava.util.concurrent.TimeUnit;publicclassGenerateLog{//一、數(shù)據(jù)定義//1、url地址publicstaticString[]urlPaths={"article/102.html","article/103.html","article/104.html","article/105.html","article/106.html","article/107.html","article/108.html","article/109.html","video/322","tag/list"};//2、ip數(shù)字publicstaticString[]ipSplices={"102","71","145","33","67","54","164","121"};//3、http網(wǎng)址publicstaticString[]httpReferers={"/s?wd=%s","/web?query=%s","/search?q=%s","/search?p=%s"};//4、搜索關(guān)鍵字publicstaticString[]searchKeyword={"復(fù)制粘貼玩大數(shù)據(jù)",
"網(wǎng)站用戶行為分析",
"Elasticsearch的安裝",
"Kafka的安裝及發(fā)布訂閱消息系統(tǒng)",
"window7系統(tǒng)上Centos7的安裝",
"學(xué)習(xí)大數(shù)據(jù)常用Linux命令",
"Docker搭建Spark集群"};//5、狀態(tài)碼publicstaticString[]statusCodes={"200","404","500"};//二、隨機(jī)生成數(shù)據(jù)//1、隨機(jī)生成ippublicstaticStringsampleIp(){intipNum;Stringip="";for(inti=0;i<4;i++){ipNum=newRandom().nextInt(ipSplices.length);ip+="."+ipSplices[ipNum];}returnip.substring(1);}//2、隨機(jī)生成時(shí)間publicstaticStringformatTime(){DateFormatdateFormat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");Calendarcalendar=Calendar.getInstance();//獲取當(dāng)前時(shí)間DatecurrentDate=calendar.getTime();//設(shè)置一個(gè)起始時(shí)間(七天前)calendar.add(Calendar.DATE,-7);DatestartDate=calendar.getTime();//獲取七天內(nèi)的一個(gè)隨機(jī)時(shí)間longdateTime=startDate.getTime()+(long)(newRandom().nextDouble()*(currentDate.getTime()-startDate.getTime()));returndateFormat.format(dateTime);}//3、隨機(jī)生成urlpublicstaticStringsampleUrl(){inturlNum=newRandom().nextInt(urlPaths.length);returnurlPaths[urlNum];}//4、隨機(jī)生成檢索publicstaticStringsampleReferer(){Randomrandom=newRandom();intrefNum=random.nextInt(httpReferers.length);intqueryNum=random.nextInt(searchKeyword.length);if(random.nextDouble()<0.2){return"-";}Stringquery_str=searchKeyword[queryNum];StringreferQuery=String.format(httpReferers[refNum],query_str);returnreferQuery;}//5、隨機(jī)生成狀態(tài)碼publicstaticStringsampleStatusCode(){intcodeNum=newRandom().nextInt(statusCodes.length);returnstatusCodes[codeNum];}//6、生成日志方法//輸出日志格式:02,2022-11-1409:06:00,"GET/tag/listHTTP/1.1",https:///search?p=復(fù)制粘貼玩大數(shù)據(jù),404publicstaticStringgenerateLog(){Stringip=sampleIp();StringnewTime=formatTime();Stringurl=sampleUrl();Stringreferer=sampleReferer();Stringcode=sampleStatusCode();Stringlog=ip+","+newTime+","+"\"GET/"+url+"HTTP/1.1\""+","+referer+","+code;System.out.println(log);returnlog;}//三、主類publicstaticvoidmain(String[]args)throwsIOException,InterruptedException{//dest:生成日志的路徑//Stringdest="/home/access.log";Stringdest="access.log";Filefile=newFile(dest);//num:每次生成條數(shù)//sleepTime:多久生成一次intnum,sleepTime;if(args.length==2){num=Integer.valueOf(args[0]);sleepTime=Integer.valueOf(args[1]);}else{num=50;sleepTime=10;}while(true){for(inti=0;i<num;i++){Stringcontent=generateLog()+"\n";FileOutputStreamfos=newFileOutputStream(file,true);fos.write(content.getBytes());fos.close();}TimeUnit.SECONDS.sleep(sleepTime);}}}③代碼解釋Stringdest="/home/access.log";若解開注釋,使用此行代碼,則表示模擬生產(chǎn)的日志所存儲的路徑/home/access.log,需要注意的是,此為服務(wù)器上的路徑,而且是root用戶才具有寫權(quán)限,如果不是root用戶請修改成其他可寫路徑。在Windows系統(tǒng)的編輯器開發(fā)的時(shí)候,可以改成Windows上的路徑來測試一下生成的日志是否為自己想要的,如D:\\access.log,表示生成的日志在D:\\access.log。若代碼中直接使用access.log,表示直接生成文件在項(xiàng)目目錄下。當(dāng)執(zhí)行此類時(shí),日志就會(huì)不斷生成到所設(shè)置的路徑,而日志格式如圖7-9所示。45,2022-11-0920:13:38,"GET/article/104.htmlHTTP/1.1",/search?q=學(xué)習(xí)大數(shù)據(jù)常用Linux命令,20064,2022-11-1011:39:16,"GET/article/104.htmlHTTP/1.1",-,20021,2022-11-1405:51:41,"GET/article/104.htmlHTTP/1.1",/search?p=Elasticsearch的安裝,4043,2022-11-1208:45:44,"GET/video/322HTTP/1.1",/search?q=window7系統(tǒng)上Centos7的安裝,4044,2022-11-1412:22:46,"GET/article/102.htmlHTTP/1.1",/web?query=Elasticsearch的安裝,50064,2022-11-0804:31:54,"GET/article/104.htmlHTTP/1.1",/s?wd=網(wǎng)站用戶行為分析,20002,2022-11-1020:13:50,"GET/article/104.htmlHTTP/1.1",/web?query=Kafka的安裝及發(fā)布訂閱消息系統(tǒng),200圖7-9日志格式生成隨機(jī)時(shí)間,為了展示效果美觀,本實(shí)訓(xùn)模擬生成七天的數(shù)據(jù),在實(shí)際操作過程中,可以不模擬七天,直接返回當(dāng)天日期即可。此外,還可以在執(zhí)行的時(shí)候添加參數(shù),第一個(gè)參數(shù)為一個(gè)批次生成的條數(shù),第二個(gè)參數(shù)為多少秒生成一次,如不設(shè)置,則默認(rèn)是每10秒生成50條。④測試生成日志接下來可以先在Windows本地測試運(yùn)行,觀察運(yùn)行結(jié)果是否有問題。注意目前所設(shè)置的路徑為:access.log。如圖7-10所示。圖7-10設(shè)置路徑并執(zhí)行點(diǎn)擊執(zhí)行按鈕,稍等一小會(huì),可以發(fā)現(xiàn)項(xiàng)目目錄下有日志文件access.log生成了,如圖7-11所示;控制臺也有顯示,如圖7-12所示。圖7-11查看文件日志圖7-12控制臺中查看結(jié)果⑤打包接下來可以將代碼打包到服務(wù)器上執(zhí)行,使生成的日志在服務(wù)器的/home路徑下。此時(shí)需要注釋掉Windows路徑,修改為Linux服務(wù)器的路徑,如圖7-13所示。圖7-13修改日志路徑此外,因?yàn)榉?wù)器上的JDK版本是8,而在Windows的版本為jdk11的話,需要設(shè)置一下打包的項(xiàng)目語言級別才能兼容。點(diǎn)擊“ProjectStructure”→“Project”,在“LanguageLevel”選擇服務(wù)器上相應(yīng)的語言級別,JDK8對應(yīng)的是8級別,如圖7-14所示。圖7-14選擇對應(yīng)的語言級別此外,還需要修改一下pom.xml文件中編譯代碼的JDK版本,此處修改為8,默認(rèn)是11。如圖7-15所示。圖7-15設(shè)置編譯的JDK版本接著就可以將代碼進(jìn)行打包了,先點(diǎn)擊編輯器右側(cè)欄的“Maven”,再依次找到“package”,如圖7-16所示。圖7-16打包項(xiàng)目 雙擊“package”按鈕,則可以對項(xiàng)目進(jìn)行打包,如打包成功,控制臺將會(huì)顯示構(gòu)建成功的標(biāo)志。如圖7-17所示。圖7-17打包成功的標(biāo)志打包完成后,發(fā)現(xiàn)項(xiàng)目里多了target文件夾,相應(yīng)的jar包也生成了。如圖7-18所示。圖7-18查看jar包文件此時(shí),將此jar包上傳到master節(jié)點(diǎn)的/root/jars文件夾(沒有此目錄則新建創(chuàng)建)。如圖7-19所示。圖7-19查看上傳路徑⑥執(zhí)行并查看結(jié)果(如果需要添加參數(shù),則在后面添加上即可),任意路徑執(zhí)行都可以:java-cp/root/jars/logstat-1.0-SNAPSHOT.jarcom.bigdata.hadoop.generate.GenerateLog 操作結(jié)果如圖7-20所示。圖7-20執(zhí)行模擬生成日志代碼此時(shí)發(fā)現(xiàn)終端上一直有日志顯示,此時(shí)打開一個(gè)新的終端窗口,進(jìn)入到生成日志的目錄查看:cd/homewc-laccess.log 操作結(jié)果如圖7-21所示。圖7-21查看日志行數(shù)來查看生成的日志條數(shù),目前為700條(實(shí)操結(jié)果會(huì)有差異,日志還在實(shí)時(shí)產(chǎn)生)。查看前10條數(shù)據(jù):head-n10access.log查看結(jié)果如圖7-22所示。圖7-22查看日志前10條數(shù)據(jù)此時(shí)先切換終端,結(jié)束之前產(chǎn)生日志的程序,按CTRL+C則可停止。再查看一下日志數(shù),發(fā)現(xiàn)有1300條了。通過命令來查看文件的大?。篸u-haccess.log目前大小為156K(大概是0.12K每條日志)。操作結(jié)果如圖7-23所示。圖7-23查看日志行數(shù)與大小至此,模擬日志生成步驟就已經(jīng)實(shí)現(xiàn)了。實(shí)際生產(chǎn)上,應(yīng)該是有一個(gè)專門的服務(wù)器來生產(chǎn)和存儲日志的,比如說日志服務(wù)器,每當(dāng)用戶訪問Web服務(wù)器上的網(wǎng)站時(shí),都會(huì)有不同的日志產(chǎn)生。此處不做過多介紹,如需要了解,可以自行參考Web開發(fā)相關(guān)的資料。2)編寫Flume配置文件接下來,需要使用Flume來采集日志,在前面模塊已經(jīng)介紹過了,此處有個(gè)不一樣的地方是此處不僅僅是采集到HDFS上,同時(shí)還采集到Kafka上去。①新建配置文件kafka-hdfs.conf在master節(jié)點(diǎn)執(zhí)行:cd/opt/software/apache-flume-1.10.1-bin/confvimkafka-hdfs.conf添加內(nèi)容:#agent1agent1.channels=channel1channel2agent1.sources=source1agent1.sinks=sink1sink2#agent1execSourceagent1.sources.source1.type=mand=tail-n+0-F/home/access.log#agent1memoryChannelagent1.channels.channel1.type=memoryagent1.channels.channel1.capacity=1000agent1.channels.channel1.transactionCapacity=100#agent1fileChannelagent1.channels.channel2.type=fileagent1.channels.channel2.checkpointDir=/opt/software/apache-flume-1.10.1-bin/fchannel/spool/checkpointagent1.channels.channel2.dataDirs=/opt/software/apache-flume-1.10.1-bin/fchannel/spool/dataagent1.channels.channel2.capacity=100000agent1.channels.channel2.transactionCapacity=6000agent1.channels.channel2.checkpointInterval=60000#agent1hdfsSinkagent1.sinks.sink1.type=hdfsagent1.sinks.sink1.hdfs.path=hdfs://master:8020/user/flume/events/%Y-%m-%dagent1.sinks.sink1.hdfs.filePrefix=eventsagent1.sinks.sink1.hdfs.rollInterval=600agent1.sinks.sink1.hdfs.rollSize=268435456agent1.sinks.sink1.hdfs.rollCount=0agent1.sinks.sink1.hdfs.idleTimeout=3600agent1.sinks.sink1.hdfs.writeFormat=Textagent1.sinks.sink1.hdfs.inUseSuffix=.txtagent1.sinks.sink1.hdfs.fileType=DataStreamagent1.sinks.sink1.hdfs.useLocalTimeStamp=true#agent1kafkaSinkagent1.sinks.sink2.type=org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.sink2.topic=kafkatopicagent1.sinks.sink2.brokerList=master:9092,slave1:9092,slave2:9092agent1.sinks.sink2.requiredAcks=1agent1.sinks.sink2.batchSize=20#將source和sink綁定到channelagent1.sources.source1.channels=channel1channel2agent1.sinks.sink1.channel=channel2agent1.sinks.sink2.channel=channel1 此處額外添加了一個(gè)fileChannel和kafkaSink。寫好配置文件之后,可以先不啟動(dòng),等后面的組件整合完成再聯(lián)調(diào)也可以。3)Flume整合Kafka注意到上面Flume的配置文件里,Kafka的Topic取名為kafkatopic,所以在Kakfa里需要新建一個(gè)kafkatopic的Topic。①新建Kafka的Topic啟動(dòng)ZooKeeper(三臺服務(wù)器都要執(zhí)行):zkServer.shstart啟動(dòng)Kafka(三臺服務(wù)器都要執(zhí)行):kafka-server-start.sh-daemon$KAFKA_HOME/config/perties此時(shí)查看一下各節(jié)點(diǎn)的進(jìn)程情況。如圖7-24所示。~/shell/jps_all.sh圖7-24查看三臺節(jié)點(diǎn)的進(jìn)程情況接下來新建名稱為kafkatopic的Topic:kafka-topics.sh--create--replication-factor3--partitions5--topickafkatopic--bootstrap-servermaster:9092,slave1:9092,slave2:9092 創(chuàng)建結(jié)果如圖7-25所示。圖7-25新建Topic創(chuàng)建好后,可以查看一下Topic的詳情,觀察是否正常:kafka-topics.sh--describe--topickafkatopic--bootstrap-servermaster:9092,slave1:9092,slave2:9092如圖7-26所示,表示節(jié)點(diǎn)正常。圖7-26查看Topic詳情②構(gòu)建Kafka業(yè)務(wù)代碼結(jié)構(gòu)繼續(xù)回到IDEA編輯器里編寫代碼,因?yàn)樯a(chǎn)者來源于Flume。所以,此處不需要自己編寫生產(chǎn)者,但是在測試的時(shí)候,自己應(yīng)該養(yǎng)成編寫測試類的習(xí)慣,編寫生產(chǎn)者來調(diào)試,模擬生產(chǎn)一些數(shù)據(jù),此處省略調(diào)試過程,只實(shí)現(xiàn)了消費(fèi)者端。新建包名(注意位置和包名)如圖7-27所示。圖7-27新建包如圖7-28所示,新建CustomConsumer類:圖7-28新建CustomConsumer類③引入Kafka所需要的pom.xml依賴因?yàn)轫?xiàng)目里要用到打包的相關(guān)插件,所以一起將其加進(jìn)來,加粗字體為新增的內(nèi)容。目前完整的pom.xml文件參考如下:<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.bigdata.hadoop</groupId><artifactId>logstat</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><properties><piler.source>8</piler.source><piler.target>8</piler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><kafka.version>3.3.1</kafka.version></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.36</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target><testExcludes><testExclude>/src/test/**</testExclude></testExcludes><encoding>utf-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><!--thisisusedforinheritancemerges--><phase>package</phase><!--指定在打包節(jié)點(diǎn)執(zhí)行jar包合并操作--><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project> 添加好后,需要導(dǎo)入依賴。右擊pom.xml文件,選擇“Maven”,選擇“Reloadproject”。如圖7-29所示。圖7-29導(dǎo)入kafka依賴kafka-clients是進(jìn)行Kafka編程所需要導(dǎo)入的依賴,slf4j-nop是為了解決運(yùn)行時(shí)報(bào)警告而引入的依賴。等加載完,則可以繼續(xù)操作。④定義配置項(xiàng)由于編寫代碼過程中會(huì)用到相關(guān)的配置類,此時(shí)可以定義一個(gè)專門類來存放,在hadoop包下新建property包,并且在此包中新建MyProperties類。如圖7-30所示。圖7-30新建MyProperties類在MyProperties類中添加配置項(xiàng):packageperty;publicclassMyProperties{//Kafka相關(guān)配置項(xiàng)publicstaticfinalStringZK="31:2181";publicstaticfinalStringTOPIC="kafkatopic";publicstaticfinalStringBROKER_SERVER="31:9092";publicstaticfinalStringGROUP_ID="group1";}⑤編寫CustomConsumer類代碼packagecom.bigdata.hadoop.kafka;importcom.bigdata.hadoop.hbase.HBaseDAO;importperty.MyProperties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;publicclassCustomConsumer{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();//Kafka集群props.put("bootstrap.servers",MyProperties.BROKER_SERVER);//消費(fèi)者組,只要group.id相同,就屬于同一個(gè)消費(fèi)者組props.put("group.id",MyProperties.GROUP_ID);//關(guān)閉自動(dòng)提交offsetprops.put("mit","false");//設(shè)置key和value的反序列化方式props.put("key.deserializer","mon.serialization.StringDeserializer");props.put("value.deserializer","mon.serialization.StringDeserializer");KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);//消費(fèi)者訂閱主題consumer.subscribe(Arrays.asList(MyProperties.TOPIC));while(true){//消費(fèi)者拉取數(shù)據(jù)ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record:records){//System.out.println("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());Stringmessage=record.value();System.out.println("Receive:"+message);}//同步提交,當(dāng)前線程會(huì)阻塞直到offset提交成功mitSync();}}}⑥打包項(xiàng)目到集群并執(zhí)行: 重新打包Maven項(xiàng)目,稍等片刻,等打包完會(huì)發(fā)現(xiàn)多了一個(gè)以-jar-with-dependencies結(jié)尾的jar包,將此jar包上傳到master服務(wù)器的~/jars路徑,并執(zhí)行:java-cp/root/jars/logstat-1.0-SNAPSHOT-jar-with-dependencies.jarcom.bigdata.hadoop.kafka.CustomConsumer可以發(fā)現(xiàn)沒有數(shù)據(jù)輸出,因?yàn)镕lume還沒有啟動(dòng)。 操作結(jié)果如圖7-31所示。圖7-31執(zhí)行項(xiàng)目代碼此時(shí)再切換終端,查看發(fā)現(xiàn)多了一個(gè)CustomConsumer進(jìn)程,如圖7-32所示。圖7-32切換終端并查看進(jìn)程4)Flume與HDFS、Kafka整合①先啟動(dòng)HDFS和YARN:start-all.sh啟動(dòng)完成后查看各節(jié)點(diǎn)進(jìn)程,如圖7-33所示則表示各服務(wù)都正常。~/shell/jps_all.sh圖7-33查看三臺節(jié)點(diǎn)的進(jìn)程情況②啟動(dòng)Flume(注意目前的執(zhí)行路徑為$FLUME_HOME/conf):flume-ngagent--conf$FLUME_HOME/conf--conf-file$FLUME_HOME/conf/kafka-hdfs.conf--nameagent1Dflume.root.logger=DEBUG,console 操作結(jié)果如圖7-34所示。圖7-34啟動(dòng)Flume打開一個(gè)新的終端窗口,查看HDFS上的數(shù)據(jù),發(fā)現(xiàn)Flume的數(shù)據(jù)已經(jīng)生產(chǎn)到了HDFS上了(配置文件里配置的HDFS路徑為/user/flume/events)hdfsdfs-ls/user/flume/events/ 操作結(jié)果如圖7-35所示。圖7-35查看HDFS上是否有數(shù)據(jù)查看處于執(zhí)行狀態(tài)的CustomConsumer程序終端,也有數(shù)據(jù)顯示。如圖7-36所示。圖7-36啟動(dòng)CustomConsumer程序的終端情況此時(shí),將生成日志的程序啟動(dòng):java-cp/root/jars/logstat-1.0-SNAPSHOT.jarcom.bigdata.hadoop.generate.GenerateLog啟動(dòng)后,再觀察啟動(dòng)CustomConsumer程序的終端窗口,其實(shí)也是會(huì)有數(shù)據(jù)不斷打印出來的。至此,F(xiàn)lume、Kafka、HDFS就已經(jīng)整合好了。5)Kafka與HBase整合①引入HBase所需要的pom.xml依賴(注意位置)<hbase.version>2.5.0</hbase.version><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version></dependency> 操作結(jié)果如圖7-37所示。圖7-37引入HBase相關(guān)依賴②在MyProperties配置類中添加編寫HBase代碼相關(guān)配置//HBase相關(guān)配置項(xiàng)publicstaticfinalStringZK_NODE="/hbase";publicstaticfinalStringTABLENAME="loginfo";publicstaticfinalIntegerPARTITION_NUM=100;③編寫HBaseDAO類代碼在hadoop包下新建hbase包,新建HBaseDAO類,結(jié)構(gòu)如圖7-38所示。圖7-38新建HBase包和HBaseDAO類HBaseDAO類完整代碼如下:packagecom.bigdata.hadoop.hbase;importperty.MyProperties;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.*;importorg.apache.hadoop.hbase.client.*;importjava.io.IOException;import.MalformedURLException;import.URL;importjava.text.DecimalFormat;publicclassHBaseDAO{privateTabletable=null;privateTableNametableName=null;//HBase的分區(qū)個(gè)數(shù)privateintpartitonsNum=0;//一、初始化publicHBaseDAO(){//1、配置項(xiàng)Configurationconfiguration=HBaseConfiguration.create();Connectionconnection=null;configuration.set("hbase.zookeeper.quorum",MyProperties.ZK);configuration.set("zookeeper.znode.parent",MyProperties.ZK_NODE);partitonsNum=MyProperties.PARTITION_NUM;try{//2、獲取連接connection=ConnectionFactory.createConnection(configuration);//3、獲取HBaseAdmin對象Adminadmin=connection.getAdmin();Stringtbl=MyProperties.TABLENAME;TableNametableName=TableName.valueOf(tbl);//4、表不存在時(shí)創(chuàng)建表if(!admin.tableExists(tableName)){//創(chuàng)建表描述對象TableDescriptorBuildertableDescriptor=TableDescriptorBuilder.newBuilder(tableName);//列簇1ColumnFamilyDescriptorfamilyColumn1=ColumnFamilyDescriptorBuilder.newBuilder("c1".getBytes()).build();//列簇2ColumnFamilyDescriptorfamilyColumn2=ColumnFamilyDescriptorBuilder.newBuilder("c2".getBytes()).build();tableDescriptor.setColumnFamily(familyColumn1);tableDescriptor.setColumnFamily(familyColumn2);//用HBaseAdmin對象創(chuàng)建表admin.createTable(tableDescriptor.build());}//5、獲取表table=connection.getTable(tableName);//6、關(guān)閉HBaseAdmin對象admin.close();}catch(IOExceptione){e.printStackTrace();}}//二、向表put數(shù)據(jù)//日志初始格式:02,2022-11-1409:06:00,"GET/tag/listHTTP/1.1",/search?p=復(fù)制粘貼玩大數(shù)據(jù),404//想要輸出的結(jié)果:02,20221114090600,/tag/list,,404//Stringlog=ip+"\t"+newTime+"\t"+"\"GET/"+url+"HTTP/1.1\""+"\t"+referer+"\t"+code;//ip:02//date:2022-11-1409:06:00//actionSource:"GET/tag/listHTTP/1.1"http://refererSource:/search?p=復(fù)制粘貼玩大數(shù)據(jù)//code:404publicvoidput(Stringlog){//1、切割日志信息String[]arr=log.split(",");Stringip=arr[0];Stringdate=arr[1];StringactionSource=arr[2];StringrefererSource=arr[3];Stringcode=arr[4];System.out.println("原始數(shù)據(jù):"+ip+","+date+","+actionSource+","+refererSource+","+code);//2、轉(zhuǎn)換格式為以獲取的想要的結(jié)果StringdateFormat=date.replace("-","").replace(":","").replace("","");//刪除“-”、空格和“:”String[]actionArr=actionSource.split("");//刪除空格Stringaction=actionArr[1];Stringreferer=null;if(!refererSource.equals("-")){String[]refererArr=refererSource.split("\\?");try{URLurl=newURL(refererArr[0]);referer=url.getHost();}catch(MalformedURLExceptione){e.printStackTrace();}}else{referer="-";}//3、計(jì)算出該日志所在的Region區(qū)域號StringhashCode=getHashCode(ip,dateFormat);//4、拼接HBase的RowKeyStringrowKey=hashCode+","+ip+","+dateFormat+","+code;//測試輸出結(jié)果的結(jié)果:02,20221114090600,/tag/list,,404System.out.println("轉(zhuǎn)化后數(shù)據(jù):"+ip+","+dateFormat+","+action+","+referer+","+code);//5、創(chuàng)建Put對象Putput=newPut(rowKey.getBytes());//在列簇中添加相應(yīng)的列put.addColumn("c1".getBytes(),"ip".getBytes(),ip.getBytes());put.addColumn("c1".getBytes(),"date".getBytes(),dateFormat.getBytes());put.addColumn("c1".getBytes(),"action".getBytes(),action.getBytes());put.addColumn("c1".getBytes(),"referer".getBytes(),referer.getBytes());put.addColumn("c1".getBytes(),"code".getBytes(),code.getBytes());//put數(shù)據(jù)到表中try{table.put(put);}catch(IOExceptione){e.printStackTrace();}}//三、計(jì)算出該日志所在的Region區(qū)域號實(shí)現(xiàn)方法privateStringgetHashCode(Stringip,StringdateFormat){//1、此處取ip地址最后5位StringipNum=ip.replace(".","");intlen=ipNum.length();Stringnumber=ipNum.substring(len-4);//2、取出年份和月份Stringdate=dateFormat.substring(0,6);//3、隨機(jī)數(shù)取哈希值intcode=(Integer.parseInt(date)^Integer.parseInt(number))%partitonsNum;//4、格式化后返回DecimalFormatdf=newDecimalFormat();df.applyPattern("00");returndf.format(code);}}④測試代碼在hbase包下簡單寫一個(gè)HBaseDAOTest類,在里面編寫一個(gè)main方法,調(diào)用HBaseDAO執(zhí)行插入一條日志的操作。完整代碼如下:packagecom.bigdata.hadoop.hbase;publicclassHBaseDAOTest{publicstaticvoidmain(String[]args){//定義一條數(shù)據(jù)Stringlog="02,2022-11-1409:06:00,\"GET/tag/listHTTP/1.1\",/search?p=復(fù)制粘貼玩大數(shù)據(jù),404";//插入數(shù)據(jù)測試HBaseDAOhBaseDAO=newHBaseDAO();hBaseDAO.put(log);}}啟動(dòng)HBase集群:start-hbase.sh執(zhí)行main方法,執(zhí)行結(jié)束后進(jìn)入HBaseShell操作頁面,可用查看到已經(jīng)通過代碼新建了表,如圖7-39所示。hbaseshelllist圖7-39查看表查看loginfo表數(shù)據(jù),如圖7-40所示。scan"loginfo"圖7-40查看表數(shù)據(jù)可以看到,已經(jīng)將日志數(shù)據(jù)插入到了HBase表中,說明測試成功。測試成功后,我們需要?jiǎng)h除一下測試數(shù)據(jù),先disable表,再drop表。如圖7-41所示。disable'loginfo'drop'loginfo'圖7-41刪除表⑤CustomConsumer類調(diào)用HBaseDAO測試插入數(shù)據(jù)到HBase成功后,此時(shí)就可以將Kafka與HBase結(jié)合起來了。所以,此時(shí)可以在Kafka的消費(fèi)者端調(diào)用HBaseDAO相關(guān)代碼,直接在CustomConsumer的main方法里調(diào)用即可,在while循環(huán)里調(diào)用put方法將數(shù)據(jù)寫入到HBase中,代碼如圖7-42、7-43所示。HBaseDAOhbaseDao=newHBaseDAO();hbaseDao.put(message);圖7-42構(gòu)建HBaseDAO對象圖7-43CustomConsumer類調(diào)用HBaseDAO⑥打包到集群并執(zhí)行重新打包新程序并上傳到服務(wù)器上,執(zhí)行CustomConsumer程序。java-cp/root/jars/logstat-1.0-SNAPSHOT-jar-with-dependencies.jarcom.bigdata.hadoop.kafka.CustomConsumer切換終端,查看HBase的表,可以看到生成了loginfo表,但此時(shí)還沒有數(shù)據(jù)。如圖7-44所示。圖7-44查看HBase的表此時(shí)需要將Flume和模擬生成日志程序啟動(dòng)好。打開一個(gè)新終端,執(zhí)行:flume-ngagent--conf$FLUME_HOME/conf--conf-file$FLUME_HOME/conf/kafka-hdfs.conf--nameagent1Dflume.root.logger=DEBUG,console打開一個(gè)新終端,啟動(dòng)模擬生成日志程序:java-cp/root/jars/logstat-1.0-SNAPSHOT.jarcom.bigdata.hadoop.generate.GenerateLog查看表數(shù)據(jù):scan"loginfo" 查看結(jié)果如圖7-45所示。圖7-45查看HBase表信息并且可以發(fā)現(xiàn),loginfo表中的數(shù)據(jù),是一直在增加的。因?yàn)槭菍?shí)時(shí)插入的。 至此,模擬日志生成,F(xiàn)lume實(shí)時(shí)采集日志,將采集的日志發(fā)送給Kafka,并且將數(shù)據(jù)寫到到HBase的流程就跑通了。6)MapReduce分析HDFS上的數(shù)據(jù)并寫入到MySQL接下來就要對數(shù)據(jù)進(jìn)行分析了,為了簡化操作流程,本項(xiàng)目不考慮其他因素,直接將IP作為用戶的訪問量,實(shí)際開發(fā)上還要考慮很多因素的,而且也已經(jīng)將數(shù)據(jù)模擬得非常工整。也就是說,默認(rèn)一個(gè)IP就是一個(gè)用戶,前面已經(jīng)提及,本項(xiàng)目業(yè)務(wù)需求是統(tǒng)計(jì)每天用戶的訪問量,需要將統(tǒng)計(jì)結(jié)果寫入到MySQL里。此過程對學(xué)生所掌握的基礎(chǔ)要求比較高,知識點(diǎn)比較多,如果接觸SpringBoot和JS、HTML的話會(huì)比較容易上手。①M(fèi)ySQL準(zhǔn)備工作打開一個(gè)新終端,先登錄MySQL:mysql-uroot-p123456新建數(shù)據(jù)庫logstat:createdatabaselogstat;uselogstat;創(chuàng)建統(tǒng)計(jì)結(jié)果相應(yīng)的表day_log_access_topn_stat:createtableday_log_access_topn_stat(dayvarchar(8)notnull,timesbigint(10)notnull,primarykey(day));②引入HDFS所需要的pom.xml依賴(注意位置)<hadoop.version>3.3.4</hadoop.version><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency> 加入依賴位置如圖7-46所示。圖7-46引入編寫Hadoop程序所需的依賴編寫好pom.xml文件后,記得Reload一下項(xiàng)目,將依賴加載到項(xiàng)目中。③編寫代碼準(zhǔn)備好后,新建mapreduce包和LogStat2MySQL類,如圖7-47所示。圖7-47新建包和類在MyProperties類中加入MySQL配置項(xiàng)。//MySQL相關(guān)配置項(xiàng)publicstaticfinalStringDRIVER="com.mysql.cj.jdbc.Driver";publicstaticfinalStringURL="jdbc:mysql://master:3306/logstat?useUnicode=true&characterEncoding=UTF8";publicstaticfinalStringUSERNAME="root";publicstaticfinalStringPASSWORD="123456";編寫LogStat2MySQL類packagecom.bigdata.hadoop.mapreduce;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importperty.MyProperties;importorg.apache.hadoop.io.Writable;importorg.apache.hadoop.mapred.lib.db.DBWritable;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.db.DBConfiguration;importorg.apache.hadoop.mapreduce.lib.db.DBOutputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassLogStat2MySQL{//一、與MySQL整合publicstaticclassTblsWritableimplementsWritable,DBWritable{Stringday;inttimes;publicTblsWritable(){}publicTblsWritable(Stringday,inttimes){this.day=day;this.times=times;}publicvoidwrite(PreparedStatementstatement)throwsSQLException{statement.setString(1,this.day);statement.setInt(2,this.times);}publicvoidreadFields(ResultSetresultSet)throwsSQLException{this.day=resultSet.getString(1);this.times=resultSet.getInt(2);}publicvoidwrit
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2024年04月上海中國建設(shè)銀行上海市分行春季校園招考筆試歷年參考題庫附帶答案詳解
- 2024年03月重慶銀行合川支行招考大堂經(jīng)理崗人員筆試歷年參考題庫附帶答案詳解
- 2024年中國設(shè)備人孔市場調(diào)查研究報(bào)告
- 2024年03月海南旅投股權(quán)投資基金管理有限公司2024年招考3名工作人員筆試歷年參考題庫附帶答案詳解
- 2024年03月江蘇溧水農(nóng)村商業(yè)銀行2024年度春季校園招考12名工作人員筆試歷年參考題庫附帶答案詳解
- 2024年03月徽商銀行蕪湖分行招考筆試歷年參考題庫附帶答案詳解
- 2024年離婚雙方協(xié)商一致協(xié)議書
- 2024年03月中國郵政儲蓄銀行總行招考信息科技工作人員筆試歷年參考題庫附帶答案詳解
- 四川2025年應(yīng)急管理部四川消防研究所招聘博士11人筆試歷年典型考點(diǎn)(頻考版試卷)附帶答案詳解
- 四川2024年四川幼兒師范高等??茖W(xué)校引進(jìn)高層次人才12人筆試歷年典型考點(diǎn)(頻考版試卷)附帶答案詳解
- 內(nèi)審和管理評審培訓(xùn)課件
- 2024年湖北省公務(wù)員錄用考試《行測》真題及答案解析
- 自然辯證法習(xí)題及答案
- 特色農(nóng)產(chǎn)品超市方案
- 2024國有企業(yè)與民營企業(yè)之間的混合所有制改革合同
- 二次函數(shù)的幾何性質(zhì)(于特)(1)名師公開課獲獎(jiǎng)?wù)n件百校聯(lián)賽一等獎(jiǎng)?wù)n件
- GB/T 30595-2024建筑保溫用擠塑聚苯板(XPS)系統(tǒng)材料
- 2024年人教版八年級地理上冊期末考試卷(附答案)
- 醫(yī)學(xué)免疫學(xué)-醫(yī)學(xué)檢驗(yàn)專業(yè)學(xué)習(xí)通超星期末考試答案章節(jié)答案2024年
- 《稻草人》閱讀題及答案
- 獨(dú)立基礎(chǔ)土方開挖施工方案
評論
0/150
提交評論