尚硅谷大數(shù)據(jù)項(xiàng)目之實(shí)時項(xiàng)目2日活需求_第1頁
尚硅谷大數(shù)據(jù)項(xiàng)目之實(shí)時項(xiàng)目2日活需求_第2頁
尚硅谷大數(shù)據(jù)項(xiàng)目之實(shí)時項(xiàng)目2日活需求_第3頁
尚硅谷大數(shù)據(jù)項(xiàng)目之實(shí)時項(xiàng)目2日活需求_第4頁
尚硅谷大數(shù)據(jù)項(xiàng)目之實(shí)時項(xiàng)目2日活需求_第5頁
已閱讀5頁,還剩14頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、第1章實(shí)時處理模塊1.1模塊搭建添加scala框架1.2代碼思路1)消費(fèi)kafka中的數(shù)據(jù);2)利用redis過濾當(dāng)日已經(jīng)計(jì)入的日活設(shè)備;3) 把每批次新增的當(dāng)日日活信息保存到HBASE或ES 中;4)從ES中查詢出數(shù)據(jù),發(fā)布成數(shù)據(jù)接口,通可視化化工程調(diào)用。1.3代碼開發(fā)1 -消費(fèi)Kafka1.3.1配置1) perties# Kafka 配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092# Redis 配置redis.host=hadoop102rdis.port=6379 2) pom.xml

2、 com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.apache.spark spark-core_2.11 org.apache.spark spark-stream in g_2.11 org.apache.kafka kafka-clie nts org.apache.spark spark-stream in g-kafka-0-8_2.11 redis.clie nts jedis 2.9.0io.searchboxjestvversion 5.3.3 net.java.dev.j naj na 4.5.2org

3、.codehaus.ja nino com mon s-compiler 2.7.8!-該插件用于將Scala代碼編譯成n et.alchim31.mave n scala-mave n-plugi n 3.2.2!-聲明綁定到mavencompile testCompile1.3.2工具類class 文件-的 compile 階段-1 ) MykafkaUtilpackage com.atguigu.utilsimport java.util.Propertiesimport kafka.serializer.Stri ngDecoderimport org.apache.spark.str

4、eami ng.Streami ngCon textimport org.apache.spark.streami ng.dstrea m.ln putDStreamimport org.apache.spark.streami ng.kafka.KafkaUtilsobject MyKafkaUtil def getKafkaStream(ssc:Stream ingCon text,In putDStream(Stri ng, Stri ng) = topics:SetStri ng):valproperties:Properties=PropertiesUtil.load(c on fi

5、perties)val kafkaPara = Map(bootstrap.servers-properties.getProperty(kafka.broker.list),group.id - bigdata0408)/ 基于Direct方式消費(fèi)Kafka數(shù)據(jù)valkafkaDStream:In putDStream(Stri ng,Strin g)=KafkaUtils.createDirectStreamStri ng,String,Strin gDecoder,Strin gDecoder(ssc, kafkaPara, topics)/ 返回kafkaDStream2 )

6、 PropertiesUtilimport java.i o.ln putStreamReaderimport java.util.Propertiesobject PropertiesUtil def load(propertieName:Stri ng): Properties =val prop=new Properties。;prop .lo ad( newIn putStreamReader(Thread.curre ntThread().getC on textClassLoader.ge tResourceAsStream(propertieName) , UTF-8)prop

7、3 ) RedisUtil object RedisUtil var jedisPool:JedisPoo l=n ulldef getJedisClie nt: Jedis = if(jedisPool=null)printin(”開辟一個連接池)val config = PropertiesUtil.load(c on perties) val host = con fig.getProperty(redis.host)val port = con fig.getProperty(redis.port)最大連接數(shù)最大空閑最小空閑忙碌時是否等待忙碌時等待時長毫秒 每次獲得連接的

8、進(jìn)行測試val jedisPoolConfig = new JedisPoolConfig() jedisPoolC on fig.setMaxTotal(IOO) / jedisPoolCo nfig.setMaxldle(20) / jedisPoolCo nfig.setMi nldle(20) / jedisPoolCo nfig.setBlockWhe nExhausted(true) / jedisPoolC on fig.setMaxWaitMillis(500) / jedisPoolC on fig.setTest On Borrow(true) /jedisPoo l=ne

9、w JedisPool(jedisPoolC on fig,host,port.to Int)/ prin tl n(sjedisPool.getNumActive= $jedisPool.getNumActive)/ println(”獲得一個連接)jedisPool.getResource1.3.3樣例類Startuplogcase class StartUpLog(mid:Stri ng, uid:Stri ng, appid:Stri ng, area:Stri ng, os:Stri ng, ch:Stri ng, logType:Stri ng, vs:Stri ng, var l

10、ogDate:Stri ng, var logHour:Stri ng, var ts:L ong)1.3.4業(yè)務(wù)類消費(fèi)kafkaimport org.apache.phoe ni x.spark._object RealtimeStartupApp def ma in (args: ArrayStri ng): Unit = valsparkC onf:SparkC onfSparkCo nf().setMaster(local*).setAppName(gmall2019)val sc = new SparkC on text(sparkC onf)val ssc = new Stream

11、 ingCon text(sc,Sec on ds(10)newvalstartupStream:String MyKafkaUtil.getKafkaStream(ssc ARTUP)In putDStreamC on sumerRecordStri ng,Set(GmallCo nsta nts.KAFKA_TOPIC_ST/startupStream.map(_.value().foreachRDD rdd=/printin (rdd.collectOkStri ng(n)/valstartupLogDstream:startupStream.map(_.value().map log

12、=/ println( slog = $log)valstartUpLog:StartUpLogDStreamStartUpLog= JSON.parseObject(log,classOfStartUpLog) startUpLog1.4代碼開發(fā)2 -去重1.4.1流程圖142 設(shè)計(jì) Redis 的 KVkeyvaluedau:2019-01-22設(shè)備id143業(yè)務(wù)代碼import java.utilimport java.text.SimpleDateFormatimport java.util.Dateimport com.alibaba.fastjs on JSONimport com

13、.atguigu.gmall.c on sta nt.GmallC on sta ntsimport com.atguigu.gmall2019.realtime.bea n. StartupLogimport com.atguigu.gmall2019.realtime.util.MyKafkaUtil, RedisUtilimport org.apache.hadoop.c onf.Con figuratio nimport org.apache.kafka.clie nts.c on sumer.C on sumerRecordimport org.apache.spark.SparkC

14、 onfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.stream in g.dstream.DStream, In putDStreamimport org.apache.spark.stream in g.Sec on ds, Stream ingCon textimport redis.clie nts.jedis.Jedisimport org.apache.phoe ni x.spark._object DauApp def main(

15、 args: ArrayStri ng): Unit = valsparkC onf:SparkC onf=newSparkCo nf().setMaster(local*).setAppName(dau_app)val ssc = new Stream ingCon text(sparkC on f,Sec on ds(5)/ 1 消費(fèi) kafkaval inputDstream: InputDStreamConsumerRecordString, StringMyKafkaUtil.getKafkaStream(ssc,Set(GmallCo nsta nts.KAFKA_TOPIC_ST

16、ARTUP)2數(shù)據(jù)流轉(zhuǎn)換結(jié)構(gòu)變成case class補(bǔ)充兩個時間字段val startuplogDstream:DStreamStartupLog= in putDstream.map record =val json Str: String = record.value()valstartupLog:StartupLog=JSON.parseObject(js on Str,classOfStartupLog)valdateTimeStr:Stri ng= new SimpleDateFormat(yyyy-MM-ddHH).format(new Date(startupLog.ts)val

17、 dateArr: ArrayString = dateTimeStr.split(”)startupLog .lo gDate = dateArr(0)startupLog .lo gHour = dateArr(1) startupLogstartuplogDstream.cache()3利用用戶清單進(jìn)行過濾去重只保留清單中不存在的用戶訪問記錄valfilteredDstream:DStreamStartupLogstartuplogDstream.tra nsform rdd =按周期val jedis: Jedis = RedisUtil.getJedisClient /driver

18、/執(zhí)行valdateStr:Stri ng= new SimpleDateFormat(yyyy-MM-dd).format(new Date()val key = dau: + dateStrval dauMidSet: util.SetStri ng = jedis.smembers(key)jedis.close()valdauMidBC:Broadcastutil.SetStri ngssc.sparkC on text.broadcast(dauMidSet)prin tl n(”過濾前:” + rdd.cou nt()val filteredRDD: RDDStartupLog =

19、 rdd.filter startuplog =/executorval dauMidSet: util.SetStri ng = dauMidBC.value!dauMidSet.c ontain s(startuplog.mid)prin tl n(”過濾后:” + filteredRDD.cou nt()filteredRDD4批次內(nèi)進(jìn)行去重:按照mid進(jìn)行分組,每組取第一個值val groupbyMidDstream: DStream(Stri ng, IterableStartupLog)filteredDstream.map(startuplog=(startuplog.mid,s

20、tartuplog).grou pByKey()valdistictDstream:DStreamStartupLoggroupbyMidDstream.flatMap case (mid, startupLogItr)=startupLogltr.toList.take(1)/ 5 保存今日訪問過的用戶 (mid)清單 -Redis 1 key類型:setkey : dau:2019-xx-xx 3 value : middistictDstream.foreachRDDrdd=/driverrdd.foreachPartiti on startuplogItr=val jedis:Jedi

21、s=RedisUtil.getJedisClie nt /executorfor (startuplog - startuplogItr ) val key= dau:+startuplog .lo gDatejedis.sadd(key,startuplog.mid)prin tl n( startuplog)jedis.close()ssc.start()ssc.awaitTerm in ati on()1.5代碼實(shí)現(xiàn)3 -保存到HBase中1.5.1 Phoenix-HBase的 SQL化插件技術(shù)詳情參見尚硅谷大數(shù)據(jù)技術(shù)之phoe nix1.5.2利用Phoenix建立數(shù)據(jù)表create

22、 table gmall190408_dau( _mid varchar, uid varchar, appid varchar, area varchar, os varchar, ch varchar, type varchar, vs varchar, logDate varchar, logHour varchar, ts bigi nt CONSTRAINT dau_pk PRIMARY KEY (mid, logDate);1.5.3 pom.xml中增加依賴org.apache.phoe ni x phoe ni x-spark 4.14.2-HBase-1.3org.apach

23、e.spark spark-sql_業(yè)務(wù)保存代碼/把數(shù)據(jù)寫入hbase+phoenix distictDstream.foreachRDDrdd= rdd.saveToPhoe nix(GMALL2019_DAU,Seq(MID,UID,APPID,AREA, OS, CH, TYPE, VS, LOGDATE, LOGHOUR, TS) ,newCon figuratio n,Some(hadoop102,hadoop103,hadoop104:2181)第2章日活數(shù)據(jù)查詢接口2.1訪問路徑總數(shù)http:/localhost:8070/realtime-total?date

24、=2019-09-06分時統(tǒng)計(jì)http:/localhost:8070/realtime-hours?id二dau&date=2019-09-062.2要求數(shù)據(jù)格式總數(shù)id:dau,name:新增日活,”value:1200,id:new_mid,name:新增設(shè)備,”value:233分時統(tǒng)計(jì)yesterday:11:383,12:123,17:88,19:200 ,today:12:38,13:1233,17:123,19:6882.3搭建發(fā)布工程fi J1VModdtSDK! 叵聰*mit:是 JWB EntrrpnseClicxjfi# IritisSsrr Sjrvic# URL&

25、JSq&sCi!)I2MF;) Default ttpss/f Start.ipirrn.i gC) CloudsO Custom:O Epdngvatae Btirnyaur nwwork connectorii is xtiv址 before torrtiLJir.鼻 Ando;dIntelliJ PlLatform PlugirSprint h Hj-aGji.HT Mavm亍 jrdleProject (Metadata口r up:corrie 日 tg igugmll 2019,dwArtifactsdw-DiiblisberILaMai/en Projj&ct 】Paiirkghg

26、-Java Version:Name1Deme project fo-r Spring Boat0A1-SNAP WOTdw-publiibFrcem. atg big ul g rm all2019,d vM.pub I is her!* MoclukBodi 15-21 “Salectsd D百ipmrtdmnci甸#Develcper Tgl匚笫伽怕IPAChcvdoper To-ali_ M/SQL Drk-efLoinbGkWebILS JDBC API5prin Wb tasterWdSQLFFramF.ADiiifM申旳irigI- PoiTqwSCH DnvwSQLIK)匚 M

27、E QL Eerver OrfvEIDS匚 APIOpiSpring tlwd廠 H/JW 帶QLDilitJiWMy Doti FFumiwmitSpig Clcxid SecurityL Apache Dertsry Ddtdbdeii 二.:2.4配置文件241 pom.xmlvjava.vers ion 1.8org.spri ngframework.boot spri ng-boot-starter-web com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.spri ngframework.boot spri ng-boot-st

28、arter-testtestorg.mybatis.spri ng.boot mybatis-spri ng-boot-starter vversion 1.3.4org.spri ngframework.boot spri ng-boot-starter-jdbc org.apache.phoe ni x phoe ni x-core 4.14.2-HBase-1.3com.google.guava guava 20.0org.spri ngframework.boot spri ng-boot-mave n-plugi n 2.4.2 applicati on .propertiesser

29、ver.port=8070loggi ng.l evel.root=errorspri ng.datasource.driver-class-n ame=org.apache.phoe nix.jdbc.Phoe ni xDriverspri ng.datasource.url=jdbc:phoe ni x:hadoop102,hadoop103,hadoop104:2181spri ng.datasource.data-user name=spri ng.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.exam

30、ple.phoe ni x.e ntity mybatis.mapperLocati on s=classpath:mapper/*.xml mybatis.c on figurati on. map-un derscore-to-camel-case=true2.5代碼實(shí)現(xiàn)控制層PublisherCo ntroller實(shí)現(xiàn)接口的web發(fā)布服務(wù)層PublisherService數(shù)據(jù)業(yè)務(wù)查詢in terfacePublisherServiceImpl業(yè)務(wù)查詢的實(shí)現(xiàn)類數(shù)據(jù)層DauMapper數(shù)據(jù)層查詢的in terfaceDauMapper.xml數(shù)據(jù)層查詢的實(shí)現(xiàn)配置主程序GmallPublish

31、erApplicatio n增加掃描包2.5.1 GmallPublisherApplication 增加掃描包Spri ngBootApplicati onMapperSca n( basePackagescom.atguigu.gmallXXXXXXX.publisher.m apper)public class Gmall2019PublisherApplicati onpublic static void main(String args) Spri ngApplicatio n.run (Gmall2019PublisherApplicatio n.class, args);2.5.

32、2 controller 層import com.alibaba.fastjs on. JSON;import com.alibaba.fastjs on. JSONObject;import com.atguigu.gmall2019.dw.publisher.service .P ublisherService; import mons.lan g.time.DateUtils;import org.spri ngframework.bea ns.factory.a nn otatio n. Autowired; import org.spri ngframework.web.b in d

33、.a nno tati on .GetMapp ing;import org.spri ngframework.web.bi nd.a nn otatio n.RequestParam; import org.spri ngframework.web.bi nd.a nn otatio n.RestC on troller;import java.text.ParseExcepti on;import java.text.SimpleDateFormat;import java.util.*;RestCo ntroller public class PublisherC on troller

34、AutowiredPublisherService publisherService;GetM appi ng(realtime-total)publicString realtimeHourDate(RequestParam(date)date) List list = new ArrayList();/日活總數(shù)int dauTotal = publisherService.getDauTotal(date);Map dauMap=new HashMap(); dauMap.put(id,da u);dauMap.put(”name,新增日活);dauMap.put(value,dauTot

35、al);list.add(dauMap);/新增用戶int n ewMidTotal = publisherService.getNewMidTotal(date);Map newMidMap=new HashMap();n ewMidMap.put(id, new_mid);newMidMap.put(”name,新增用戶”);n ewMidMap.put(value, newMidTotal);list.add( newMidMap);return JSON.toJSONStri ng(list);GetMapp in g(realtime-hours)publicStringrealti

36、meHourDate(RequestParam(id)id,RequestParam(date) String date)if(dau.equals(id)Map dauHoursToday = publisherService.getDauHours(date);JSONObject jso nObject = new JSONObject();jsonO bject.put(today,dauHoursToday);String yesterdayDateString=;try DatedateToday = new SimpleDateFormat(yyyy-MMdd).parse(da

37、te);Date dateYesterday = DateUtils.addDays(dateToday, -1);yesterdayDateStri ng=newSimpleDateFormat(yyyy-MMdd).format(dateYesterday); catch (ParseExceptio n e) e.pri ntStackTrace();MapdauHoursYesterdaypublisherService.getDauHours(yesterdayDateStri ng);jsonO bject.put(yesterday,dauHoursYesterday);return jso nObject.toJSONStri ng();if( n ew_order_totalam oun t.equals(id)Stri ngn ewOrderTotalam oun tJs onpublisherService.getNewOrderTotalAm oun tHours(d

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論