Hadoop大數(shù)據(jù)平臺構(gòu)建與應(yīng)用(工作手冊式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實時傳輸_第1頁
Hadoop大數(shù)據(jù)平臺構(gòu)建與應(yīng)用(工作手冊式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實時傳輸_第2頁
Hadoop大數(shù)據(jù)平臺構(gòu)建與應(yīng)用(工作手冊式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實時傳輸_第3頁
Hadoop大數(shù)據(jù)平臺構(gòu)建與應(yīng)用(工作手冊式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實時傳輸_第4頁
Hadoop大數(shù)據(jù)平臺構(gòu)建與應(yīng)用(工作手冊式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實時傳輸_第5頁
已閱讀5頁,還剩1頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

廣告日志數(shù)據(jù)實時傳輸【任務(wù)描述】Flume作為日志收集系統(tǒng),可以從不同的數(shù)據(jù)源將數(shù)據(jù)源源不斷收集,但Flume不會持久地保存數(shù)據(jù),需要使用Sink將數(shù)據(jù)存儲到外部存儲系統(tǒng),如HDFS、HBase、Kafka等。Flume與HDFS、HBase的結(jié)合一般用于離線批處理。而Flume與Kafka的整合一般用于數(shù)據(jù)實時流處理,通過Flume的Agent代理收集日志數(shù)據(jù),再由Flume的Sink將數(shù)據(jù)傳送到Kafka集群,完成數(shù)據(jù)的生產(chǎn)流程,最后交給Storm、Flink、SparkStreaming等進(jìn)行實時消費計算。本案例將基于項目8中的case_data_new.csv廣告日志數(shù)據(jù),使用Flume和Kafka整合實現(xiàn)廣告日志數(shù)據(jù)的實時傳輸。首先用腳本模擬實時生成的日志數(shù)據(jù)并存入MySQL,再使用Flume實時監(jiān)視MySQL中增加的數(shù)據(jù),采集到Kafka集群的主題中,并啟動消費者消費主題數(shù)據(jù),最終實現(xiàn)數(shù)據(jù)的實時傳輸?!救蝿?wù)分析】廣告日志數(shù)據(jù)的實時傳輸?shù)膶崿F(xiàn)步驟如下。腳本定時抽取數(shù)據(jù)到指定目錄,模擬日志文件產(chǎn)生,并將其存入MySQL表中。創(chuàng)建Kafka主題,開啟消費者以消費數(shù)據(jù)。編寫conf采集配置文件,將存入MySQL中的數(shù)據(jù)傳入Kafka主題?!救蝿?wù)實施】創(chuàng)建腳本文件在master節(jié)點下運行“mysql-uroot-pPassword123$”進(jìn)入MySQL數(shù)據(jù)庫中,創(chuàng)建數(shù)據(jù)庫kafka,并在kafka下創(chuàng)建表用于存儲數(shù)據(jù)。如REF_Ref100155267\h代碼91所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s11創(chuàng)建數(shù)據(jù)表createdatabasekafka;usekafka;createtablecase_data(`rank`int,dtint,cookievarchar(200),ipvarchar(200),idfavarchar(200),imeivarchar(200),androidvarchar(200),openudidvarchar(200),macvarchar(200),timestampsint,campint,creativeidint,mobile_osint,mobile_typevarchar(200),app_key_md5varchar(200),app_name_md5varchar(200),placementidvarchar(200),useragentvarchar(200),mediaidvarchar(200),os_typevarchar(200),born_timeint);//開啟MySQL的local_infile服務(wù)setgloballocal_infile=1;打開一個新的master終端,運行“vi/data/datamysql.sh”創(chuàng)建一個腳本文件,如REF_Ref100155277\h代碼92所示。腳本內(nèi)容為一個whiletrue循環(huán),每分鐘在case_data_new.csv隨機(jī)提取100條數(shù)據(jù),存入“/data/datamysql/mysqltmp.txt”文件中,再將文件中的數(shù)據(jù)存入MySQL數(shù)據(jù)庫中。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s12腳本datamysql.sh#!/bin/bashwhiletruedotime=$(date"+%Y%m%d_%H%M%S")shuf-n100/opt/case_data_new.csv>/data/datamysql/mysqltmp.txtmysql-uroot-pPassword123$--local-infile-e"useKafka;loaddatalocalinfile'/data/datamysql/mysqltmp.txt'intotablecase_datafieldsterminatedby','OPTIONALLYENCLOSEDBY'\"';"sleep60done腳本創(chuàng)建完成后,賦予腳本權(quán)限,然后將其啟動,如REF_Ref100155288\h代碼93所示。啟動成功后可能會發(fā)出如REF_Ref100155569\h圖91所示的警報信息,表示在命令行中直接輸入密碼賬戶信息是不安全的,該警報信息是在MySQL5.6版本后有的,并不影響運行結(jié)果,可以選擇忽視。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s13關(guān)于腳本的命令//腳本權(quán)限chmod777/data/data2mysql.sh//腳本啟動命令sh/data/datamysql.sh&//腳本中斷命令psaux|grep"datamysql.sh"|grep-vgrep|cut-c9-15|xargskill-9圖STYLEREF1\s9SEQ圖\*ARABIC\s11執(zhí)行腳本文件在MySQL數(shù)據(jù)庫中查看數(shù)據(jù)是否成功存入表中,如REF_Ref100155298\h代碼94所示。結(jié)果如REF_Ref100155537\h圖92所示??梢钥闯鲆呀?jīng)有數(shù)據(jù)存入表中,并正在實時更新中。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s14查看數(shù)據(jù)是否存入//進(jìn)入數(shù)據(jù)庫kafkausekafka;//查看表中有幾行selectcount(*)fromcase_data;圖STYLEREF1\s9SEQ圖\*ARABIC\s12數(shù)據(jù)已存入創(chuàng)建Kafka主題分別在slave1、slave2中開啟ZooKeeper、Kafka集群。在slave1節(jié)點創(chuàng)建一個Kafka主題RealTime,設(shè)置2個副本,2個分區(qū)。創(chuàng)建成功后,開啟消費者消費,如REF_Ref100155306\h代碼95所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s15創(chuàng)建Kafka主題并開啟消費//創(chuàng)建RealTime主題kafka-topics.sh-create--topicRealTime--bootstrap-serverslave1:9092,slave2:9092--partitions2--replication-factor2//開啟消費者kafka-console-consumer.sh--topicRealTime--bootstrap-serverslave1:9092,slave2:9092目前,該消費者并沒有在指定主題中消費到數(shù)據(jù)。Flume采集日志在Flume的conf目錄下創(chuàng)建一個“datamysql.conf”文件,實現(xiàn)從MySQL中采集數(shù)據(jù),并傳入RealTime主題中,如REF_Ref100155318\h代碼96所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s16Flume腳本datamysql.confagent.sources=sql-sourceagent.sinks=k1agent.channels=chagent.sources.sql-source.type=org.keedio.flume.source.SQLSourceagent.sources.sql-source.hibernate.connection.url=jdbc:mysql://81:3306/kafka?&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMTagent.sources.sql-source.hibernate.connection.user=rootagent.sources.sql-source.hibernate.connection.password=Password123$agent.sources.sql-source.hibernate.dialect=org.hibernate.dialect.MySQLDialectagent.sources.sql-source.hibernate.driver_class=com.mysql.cj.jdbc.Driveragent.sources.sql-source.hibernate.connection.autocommit=trueagent.sources.sql-source.table=case_dataagent.sources.sql-source.columns.to.select=*agent.sources.sql-source.run.query.delay=10000agent.sources.sql-source.status.file.path=/var/lib/=sql-source.statusagent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinkagent.sinks.k1.topic=RealTimeagent.sinks.k1.brokerList=slave1:9092,slave2:9092agent.sinks.k1.batchsize=200agent.sinks.kafkaSink.requiredAcks=1agent.sinks.k1.serializer.class=kafka.serializer.StringEncoderagent.sinks.kafkaSink.zookeeperConnect=slave1:2181,slave2:2181agent.channels.ch.type=memoryagent.channels.ch.capacity=10000agent.channels.ch.transactionCapacity=10000agent.channels.hbaseC.keep-alive=20agent.sources.sql-source.channels=chagent.sinks.k1.channel=ch啟動FlumeAgent命令開始采集MySQL中的數(shù)據(jù),如REF_Ref100155329\h代碼97所示。切換到Kafka消費者的終端,可以看到主題上已經(jīng)有數(shù)據(jù)被消費者消費,如REF_Ref100155585\h圖93所示。觀察消費者終端,可以看到消費者每過一分鐘,就會有新數(shù)據(jù)消費,因為腳本文件一直在模擬用戶產(chǎn)生數(shù)據(jù),而Flume在實時采集并傳入到Kafka主題上。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s17執(zhí)行Flume腳本flume-ngagent-nagent-f/usr/local/src/flume/conf/datamysql.conf-c/usr/local/src/flume/conf/-Dflume.root.logger=INFO,console圖STYLEREF1\s9

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論