




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
《大數(shù)據(jù)技術(shù)與應(yīng)用基礎(chǔ)》21世紀(jì)高等院?!霸朴嬎愫痛髷?shù)據(jù)”人才培養(yǎng)規(guī)劃教材第8章開源集群計算環(huán)境Spark人民郵電出版社能力CAPACITY要求了解如何根據(jù)實際需求選擇合適的算子操作數(shù)據(jù)。理解理解Spark的工作機制。了解使用Spark提供的算子對數(shù)據(jù)進(jìn)行轉(zhuǎn)化。熟悉對源數(shù)據(jù)結(jié)構(gòu)格式進(jìn)行分析。Spark接口實戰(zhàn)Spark簡介Spark編程之RDDSpark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡SparkMLlib實戰(zhàn)(聚類實戰(zhàn))一、Spark簡介簡介Spark是UCBerkeleyAMPlab所開源的類HadoopMapReduce的通用并行框架,Spark擁有HadoopMapReduce的優(yōu)點;但不同于MapReduce的是Spark工作中間輸出結(jié)果可以保存在內(nèi)存中,而不再需要讀寫HDFS。因此Spark能更好地適用于數(shù)據(jù)挖掘與機器學(xué)習(xí)等需要迭代的MapReduce的算法。Spark可以在Hadoop文件系統(tǒng)中并行運行。通過名為Mesos的第三方集群框架可以支持此行為。Spark簡介Spark編程之RDDSpark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡SparkMLlib實戰(zhàn)(聚類實戰(zhàn))Spark接口實戰(zhàn)二、Spark接口實戰(zhàn)環(huán)境要求
IDEA使用和打包(1)啟動IDEA→“WelcometoIntelliJIDEA”→“CreateNewProject”→“Scala”→“Non-SBT”→創(chuàng)建一個名為text的project(注意這里選擇自己安裝的JDK和scala編譯器)→“Finish”。(2)增加開發(fā)包:依次選擇“File”→“ProjectStructure”→“Libraries”→“+”→“java”→選擇/opt/spark/spark-1.5.2/lib/spark-assembly-1.5.2-hadoop2.6.0.jarUbuntu16.04server64JDK1.8.0Hadoop2.6.0Spark1.5.2Scala2.11.7集成開發(fā)環(huán)境:IDEA2016-IC(需安裝scala插件)二、Spark接口實戰(zhàn)IDEA使用和打包
objectWordCount{defmain(args:Array[String]){if(args.length==0){System.err.println("Usage:WordCount<file1>")System.exit(1)
}
valconf=newSparkConf().setAppName("WordCount")valsc=newSparkContext(conf)
sc.textFile(args(0)).flatMap(_.split("")).map(x=>(x,1)).reduceByKey(_+_).take(10).foreach(println)sc.stop()
} }(3)編寫代碼。在src目錄下創(chuàng)建1個名為text的package,并增加一個Scalaclass類型為Object。實現(xiàn)單詞計數(shù)的WordCount代碼,如下所示:packagetext/***Createdbyhadoopon9/1/16.*/importorg.apache.spark.{SparkContext,SparkConf}importorg.apache.spark.SparkContext._
二、Spark接口實戰(zhàn)IDEA使用和打包單擊“OK”按鈕后,選擇“Build”→“BuildArtifacts”→“text”→“build”進(jìn)行打包,經(jīng)過編譯后,程序包放置在ideaProjects/text/out/artifacts/text目錄下,文件名為text.jar。(5)運行。把spark自帶文件README.md上傳到hdfs,命令如下。hadoopfs–put/opt/spark/README.md/user/hadoop(4)生產(chǎn)程序包。生成程序包之前要先建立一個artifacts,依次選擇“File”→“ProjectStructure”→“Artifacts”→“+”→“Jars”→“Frommoudleswithdependencies”,然后隨便選一個Class作為主Class;單擊“OK”按鈕后,對artifacts進(jìn)行配置,修改Name為text,刪除OutputLayout中text.jar中的幾個依賴包,只剩text項目本身;二、Spark接口實戰(zhàn)IDEA使用和打包運行完成后在Web監(jiān)控界面可以看到結(jié)果信息,如下圖Spark接口實戰(zhàn)Spark簡介Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡SparkMLlib實戰(zhàn)(聚類實戰(zhàn))Spark編程之RDD三、Spark編程之RDDRDDRDD是Spark中的抽象數(shù)據(jù)結(jié)構(gòu)類型,從編程的角度來看,RDD可以簡單看成是一個數(shù)組。和普通數(shù)組的區(qū)別是,RDD中的數(shù)據(jù)是分區(qū)存儲的,這樣不同分區(qū)的數(shù)據(jù)就可以分布在不同的機器上,同時可以被并行處理。因此,Spark應(yīng)用程序所做的無非是把需要處理的數(shù)據(jù)轉(zhuǎn)換為RDD,然后對RDD進(jìn)行一系列的變換和操作從而得到結(jié)果。下面,將介紹SparkRDD中與Map和Reduce相關(guān)的API。創(chuàng)建RDDRDD可以從普通數(shù)組創(chuàng)建出來,也可以從文件系統(tǒng)或者HDFS中的文件創(chuàng)建出來。例1:從普通數(shù)組創(chuàng)建RDD,里面包含了1到9這9個數(shù)字,它們分別在3個分區(qū)中。scala>vala=sc.parallelize(1to9,3)a:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:12
三、Spark編程之RDDRDD中與Map和Reduce相關(guān)的API1.mapmap是對RDD中的每個元素都執(zhí)行一個指定的函數(shù)來產(chǎn)生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應(yīng)。2.mapPartitionsmapPartitions是map的一個變種。map的輸入函數(shù)是應(yīng)用于RDD中每個元素,而mapPartitions的輸入函數(shù)是應(yīng)用于每個分區(qū),也就是把每個分區(qū)中的內(nèi)容作為整體來處理的。3.mapValuesmapValues顧名思義就是輸入函數(shù)應(yīng)用于RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函數(shù)只適用于元素為KV對的RDD。4.mapWithmapWith是map的另外一個變種,map只需要一個輸入函數(shù),而mapWith有兩個輸入函數(shù)。Spark接口實戰(zhàn)Spark編程之RDDSpark簡介SparkMLlib實戰(zhàn)(聚類實戰(zhàn))Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡案例描述該案例中,我們將假設(shè)我們需要統(tǒng)計一個1000萬人口的所有人的平均年齡,當(dāng)然如果想測試Spark對于大數(shù)據(jù)的處理能力,可以把人口數(shù)放的更大,比如1億人口,當(dāng)然這個取決于測試所用集群的存儲容量。假設(shè)這些年齡信息都存儲在一個文件里,并且該文件的格式如下,第一列是ID,第二列是年齡?,F(xiàn)在我們需要用Scala寫一個生成1000萬人口年齡數(shù)據(jù)的文件,源程序如下:直接創(chuàng)建scala腳本使用上述代碼運行即可獲得隨機生成的1000萬人口年齡數(shù)據(jù)文件。vimSampleAge.scalascalaSampleAge.scala
運行成功后獲得文件sample_age_data.txt。四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡案例描述importjava.io.FileWriterimportjava.io.Fileimportscala.util.Random
objectSampleAge{
defmain(args:Array[String]){ valwriter=newFileWriter(newFile("/home/hadoop/sample_age_data.txt"),false) valrand=newRandom() for(i<-1to10000000){ writer.write(i+""+rand.nextInt(100)) writer.write(System.getProperty("line.separator")) } writer.flush() writer.close()
}
}四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡案例分析要計算平均年齡,那么首先需要對源文件對應(yīng)的RDD進(jìn)行處理,也就是將它轉(zhuǎn)化成一個只包含年齡信息的RDD,其次是計算元素個數(shù)即為總?cè)藬?shù),然后是把所有年齡數(shù)加起來,最后平均年齡=總年齡/人數(shù)。對于第一步我們需要使用map算子把源文件對應(yīng)的RDD映射成一個新的只包含年齡數(shù)據(jù)的RDD,很顯然需要對在map算子的傳入函數(shù)中使用split方法,得到數(shù)組后只取第二個元素即為年齡信息;第二步計算數(shù)據(jù)元素總數(shù)需要對于第一步映射的結(jié)果RDD使用count算子;第三步則是使用reduce算子對只包含年齡信息的RDD的所有元素用加法求和;最后使用除法計算平均年齡即可。由于本例輸出結(jié)果很簡單,所以只打印在控制臺即可。四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡編程實現(xiàn)importorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextobjectAvgAgeCalculator{defmain(args:Array[String]){if(args.length<1){println("Usage:AvgAgeCalculatordatafile")System.exit(1)}valconf=newSparkConf().setAppName("SparkExercise:AverageAgeCalculator")valsc=newSparkContext(conf)valdataFile=sc.textFile(args(0),5);valcount=dataFile.count()valageData=dataFile.map(line=>line.split("")(1))valtotalAge=ageData.map(age=>Integer.parseInt(
String.valueOf(age))).collect().reduce((a,b)=>a+b)println("TotalAge:"+totalAge+";NumberofPeople:"+count)valavgAge:Double=totalAge.toDouble/count.toDoubleprintln("AverageAgeis"+avgAge)}}四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡提交到集群運行要執(zhí)行本實例的程序,需要將剛剛生成的年齡信息文件上傳到HDFS上運行一下HDFS命令把文件拷貝到HDFS的/user/hadoop目錄。本地文件上傳HDFS命令:hadoopfs-putsample_age_data.txt/user/hadoopAvgAgeCalculator類的執(zhí)行命令如下:spark-submit\--masterspark://00:7077\--nametext\--classspark.AvgAgeCalculator\--executor-memory512m\--total-executor-cores2spark.jarhdfs://master:9000/user/hadoop/sample_age_data.txt四、Spark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡監(jiān)控執(zhí)行狀態(tài)程序正確運行,則可以在控制臺看到如圖所示信息。也可以到SparkWebConsole去查看Job的執(zhí)行狀態(tài),如下圖所示。Spark接口實戰(zhàn)Spark編程之RDDSpark實戰(zhàn)案例之統(tǒng)計1000萬人口的平均年齡Spark簡介SparkMLlib實戰(zhàn)(聚類實戰(zhàn))五、SparkMLlib實戰(zhàn)(聚類實戰(zhàn))算法說明聚類的核心任務(wù)是:將一組目標(biāo)object劃分為若干個簇,每個簇之間的object盡可能相似,簇與簇之間的object盡可能相異。聚類算法是數(shù)據(jù)挖掘中重要的一部分,除了最為簡單的K-Means聚類算法外,比較常見的還有層次法(CURE、CHAMELEON等)、網(wǎng)格算法(STING、WaveCluster等)等等。聚類的目的是找到每個樣本x潛在的類別y,并將同類別y的樣本x放在一起。比如天上的星星,聚類后結(jié)果是一個個星團(tuán),星團(tuán)里面的點相互距離比較近,星團(tuán)間的星星距離就比較遠(yuǎn)。五、SparkMLlib實戰(zhàn)(聚類實戰(zhàn))實例介紹在該實例中將介紹K-Means算法,K-Means屬于基于平方誤差的迭代重分配聚類算法,其核心思想十分簡單:1.隨機選擇K個中心點;2.計算所有點到這K個中心點的距離,選擇距離最近的中心點為其所在的簇;3.簡單地采用算術(shù)平均數(shù)(mean)來重新計算K個簇的中心;4.重復(fù)步驟2和3,直至簇類不再發(fā)生變化或者達(dá)到最大迭代值;輸出結(jié)果。本實例中進(jìn)行如下步驟:1、裝載數(shù)據(jù),數(shù)據(jù)以文本文件方式進(jìn)行存放;2、將數(shù)據(jù)集聚類,設(shè)置2個類和20次迭代,進(jìn)行模型訓(xùn)練形成數(shù)據(jù)模型;3、打印數(shù)據(jù)模型的中心點;4、使用誤差平方之和來評估數(shù)據(jù)模型;5、使用模型測試單點數(shù)據(jù);6、交叉評估1,返回結(jié)果;交叉評估2,返回數(shù)據(jù)集和結(jié)果。五、SparkMLlib實戰(zhàn)(聚類實戰(zhàn))測試數(shù)據(jù)說明該實例使用的數(shù)據(jù)為kmeans_data.txt。在該文件中提供了6個點的空間位置坐標(biāo),使用K-means聚類對這些點進(jìn)行分類。使用的kmeans_data.txt的數(shù)據(jù)如下圖所示:五、SparkMLlib實戰(zhàn)(聚類實戰(zhàn))部分程序源碼://裝載數(shù)據(jù)集valdata=sc.textFile("/user/hadoop/kmeans_data.txt",1)valparsedData=data.map(s=>Vectors.dense(s.split('').map(_.toDouble)))
//將數(shù)據(jù)集聚類,2個類,20次迭代,進(jìn)行模型訓(xùn)練形成數(shù)據(jù)模型valnumClusters=2valnumIterations=20valmodel=KMeans.train(parsedData,numClusters,numIterations)
//打印數(shù)據(jù)模型的中心點println("Clustercenters:")for(c<-model.clusterCenters){println(""+c.toString)}//使用誤差平方之和來評估數(shù)據(jù)模型valcost=puteCost(parsedData)println("WithinSetSumofSquaredErrors="+cost)
五、SparkMLlib實戰(zhàn)(聚類實戰(zhàn))程序源碼:
//使用模型測試單點數(shù)據(jù)println("Vectorsisbelongstoclusters:"+model.predict(Vectors.dense("".split('').map(_.toDouble))))println("Vectors5isbelongstoclusters:"+model.predict(Vectors.dense("5".split('').map(_.toDouble))))println("Vectors888isbelongstoclusters:"+model.predict(Vectors.dense("888".split('').map(_.toDouble))))
//交叉評估1,只返回結(jié)果valtestda
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 醫(yī)藥廠家銷售合同范例
- 化妝品加盟店合同范本
- 農(nóng)村購山地合同范本
- 合同維修合同范本
- 加盟合同范本
- 舉辦展覽合同范本
- 合同范例五十二條
- 個人苗木供銷合同范本
- 會議協(xié)議合同范本模板
- 入股模擬股合同范本
- 新中式養(yǎng)生知識培訓(xùn)課件
- 山東省臨沂市地圖矢量課件模板()
- 學(xué)習(xí)2025年全國教育工作會議心得體會
- 中國中材海外科技發(fā)展有限公司招聘筆試沖刺題2025
- 兩層鋼結(jié)構(gòu)廠房施工方案
- 班級凝聚力主題班會12
- 初中語文“經(jīng)典誦讀與海量閱讀”校本課程實施方案
- Gly-Gly-Leu-生命科學(xué)試劑-MCE
- 翻斗車司機安全培訓(xùn)
- 零售業(yè)的門店形象提升及店面管理方案設(shè)計
- 高速公路40m連續(xù)T梁預(yù)制、架設(shè)施工技術(shù)方案
評論
0/150
提交評論