




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)
文檔簡介
Spark簡介
Spark簡介Spark是什么Spark是一個快速且通用的集群計算平臺。集群計算把一臺電腦無法解決的問題,放到多臺電腦組成的集群上進行解決,這就是集群計算。Spark是什么Spark是一個快速且通用的集群計算平臺。Spark的特點Spark是快速的很多任務(wù)能夠秒級完成,對于一些特定的工作,Spark比Mapreduce快10-20倍。Spark擴充了流行的Mapreduce計算模型,使Spark更高效地支持更多類型的計算,包括交互式查詢,和流處理。速度快的另一個主要原因就是,能夠在內(nèi)存中計算。Spark的特點Spark是快速的Spark的特點Spark是通用的Spark的設(shè)計,容納了之前很多獨立的,分布式系統(tǒng)所擁有的功能。獨立的分布式系統(tǒng)包括:批處理,迭代式計算,交互查詢和流處理等。并且,由之前需要維護不同的集群,到現(xiàn)在只需要維護一個Spark集群。Spark的特點Spark是通用的Spark的特點Spark是高度開放的Spark提供了Python,Java,Scala,SQL的API和豐富的內(nèi)置庫。同時,Spark和其它的大數(shù)據(jù)工具整合的很好。尤其,Spark能夠運行在Hadoop集群上面,能夠訪問Hadoop數(shù)據(jù)。Spark的特點Spark是高度開放的Spark的組件Spark包括多個緊密集成的組件。Spark的組件Spark的組件緊密集成的優(yōu)點:如果Spark底層優(yōu)化了,那么基于Spark底層的組件,也得到了相應(yīng)的優(yōu)化。例如,Spark底層增加了一個優(yōu)化算法,那么Spark的SQL和機器學(xué)習(xí)包也會自動的優(yōu)化。緊密集成,節(jié)省了各個組件組合使用時的部署,測試等時間。當(dāng)向Spark增加新的組件時,其它的組件,可以立刻享用新組件的功能。無縫連接不同的處理模型。Spark的組件緊密集成的優(yōu)點:Spark的組件SparkCore:包含Spark的基本功能,包含任務(wù)調(diào)度,內(nèi)存管理,容錯機制等。SparkCore內(nèi)部定義了RDDs(resilientdistributeddatasets,彈性分布式數(shù)據(jù)集)。RDDs代表橫跨很多工作節(jié)點的數(shù)據(jù)集合,RDDs可以被并行的處理。SparkCore提供了很多APIs來創(chuàng)建和操作這些集合(RDDs)。Spark的組件SparkCore:Spark的組件SparkSQL:是Spark處理結(jié)構(gòu)化數(shù)據(jù)的庫。它支持通過SQL查詢數(shù)據(jù),就像HQL(HiveSQL)一樣,并且支持很多數(shù)據(jù)源,像Hive表,JSON等。SparkSQL是在Spark1.0版本中新加的。Shark是一種較老的基于Spark的SQL項目,它是基于Hive修改的,它現(xiàn)在已經(jīng)被SparkSQL替代了。Spark的組件SparkSQL:Spark的組件SparkStreaming:是實時數(shù)據(jù)流處理組件,類似Storm。SparkStreaming提供了API來操作實時流數(shù)據(jù)。Spark的組件SparkStreaming:Spark的組件MLlib:Spark有一個包含通用機器學(xué)習(xí)功能的包,就是MLlib(machinelearninglib)。MLlib包含了分類,聚類,回歸,協(xié)同過濾算法,還包括模型評估,和數(shù)據(jù)導(dǎo)入。它還提供了一些低級的機器學(xué)習(xí)原語,包括通用梯度下降優(yōu)化算法。MLlib提供的上面這些方法,都支持集群上的橫向擴展。Spark的組件MLlib:Spark的組件Graphx:是處理圖的庫(例如,社交網(wǎng)絡(luò)圖),并進行圖的并行計算。就像SparkStreaming和SparkSQL一樣,Graphx也繼承了SparkRDDAPI,同時允許創(chuàng)建有向圖。Graphx提供了各種圖的操作,例如subgraph和mapVertices,也包含了常用的圖算法,例如PangeRank等。Spark的組件Graphx:Spark的組件ClusterManagers:ClusterManagers就是集群管理。Spark能夠運行在很多clustermanagers上面,包括HadoopYARN,ApacheMesos和Spark自帶的單獨調(diào)度器。如果你把Spark安裝在了裸機上面,單獨調(diào)度器能夠提供簡單的方式,讓你開始Spark之旅。如果你已經(jīng)有了HadoopYarn或者Mesos集群,那么,Spark對這些集群管理工具的支持,使你的Spark應(yīng)用程序能夠在這些集群上面運行。Spark的組件ClusterManagers:Spark的歷史Spark誕生于2009年,那時候它是,加州大學(xué)伯克利分校RAD實驗室的一個研究項目,后來到了AMP實驗室。Spark最初是基于HadoopMapreduce的,后來發(fā)現(xiàn)Mapreduce在迭代式計算和交互式上是低效的。因此Spark進行了改進,引入了內(nèi)存存儲和高容錯機制。關(guān)于Spark的研究論文在學(xué)術(shù)會議上發(fā)表,并且在它被創(chuàng)建的2009年不久之后,對于一些特定的工作,Spark比Mapreduce快10-20倍。2010年3月份Spark開源。2011年,AMP實驗室開始在Spark上面開發(fā)高級組件,像Shark(HiveonSpark),SparkStreaming。2013年轉(zhuǎn)移到了Apache下,現(xiàn)在已經(jīng)是頂級項目了。2014年5月份Spark1.0發(fā)布。Spark的歷史Spark誕生于2009年,那時候它是,加州Spark運行環(huán)境Spark是Scala寫的,運行在JVM上。所以運行環(huán)境是Java6或者以上。如果想要使用PythonAPI,需要安裝Python解釋器2.6版本或者以上。目前Spark(1.2.0版本)與Python3不兼容。Spark運行環(huán)境Spark是Scala寫的,運行在JVSpark下載下載地址:/downloads.html,選擇Pre-builtforHadoop2.4andlater這個包,點擊直接下載,這會下載一個spark-1.2.0-bin-hadoop2.4.tgz的壓縮包搭建Spark不需要Hadoop,如果你有hadoop集群或者hdfs,你可以下載相應(yīng)的版本。解壓:tar-zxvfspark-1.2.0-bin-hadoop2.4.tgzSpark下載下載地址:http://spark.apachSpark目錄README.md 開始Spark之旅的簡單介紹。bin 包含用來和Spark交互的可執(zhí)行文件,如Sparkshell。core,streaming,python,… 包含主要組件的源代碼。examples 包含一些有用的單機Sparkjob。你可以研究和運行這些例子,來學(xué)習(xí)SparkAPI。Spark目錄README.mdSpark的ShellsSpark的shell使你能夠處理分布在集群上的數(shù)據(jù)(這些數(shù)據(jù)可以是分布在硬盤上或者內(nèi)存中)。Spark可以把數(shù)據(jù)加載到工作節(jié)點的內(nèi)存中,因此,許多分布式處理(甚至是分布式的1T數(shù)據(jù)的處理)都可以在幾秒內(nèi)完成。上面的特性,使迭代式計算,實時查詢、分析一般能夠在shells中完成。Spark提供了Pythonshells和Scalashells。Spark的ShellsSpark的shell使你能夠處理分Spark的Shells打開Spark的PythonShell:到Spark目錄,Spark的PythonShell也叫做PySparkShellbin/pyspark打開PySparkShell之后的界面Spark的Shells打開Spark的PythonSheSpark的Shells打開Spark的ScalaShell:到Spark目錄bin/pysparkbin/spark-shell打開Scala版本的shell打開之后的界面Spark的Shells打開Spark的ScalaShelSpark的Shells例子:scala>vallines=sc.textFile("../../testfile/helloSpark")//創(chuàng)建一個叫l(wèi)ines的RDDlines:org.apache.spark.rdd.RDD[String]=../../testfile/helloSparkMappedRDD[1]attextFileat<console>:12scala>lines.count()//對這個RDD中的行數(shù)進行計數(shù) res0:Long=2scala>lines.first()//文件中的第一行 res1:String=hellospark修改日志級別:conf/pertieslog4j.rootCategory=WARN,consoleSpark的Shells例子:Spark的核心概念Driverprogram:包含程序的main()方法,RDDs的定義和操作。(在上面的例子中,driverprogram就是SparkShell它本身了)它管理很多節(jié)點,我們稱作executors。count()操作解釋(每個executor計算文件的一部分,最后合并)。Spark的核心概念Driverprogram:Spark的核心概念SparkContext:Driverprograms通過一個SparkContext對象訪問Spark,SparkContext對象代表和一個集群的連接。在Shell中SparkContext自動創(chuàng)建好了,就是sc,例子:sc變量>>>sc<pyspark.context.SparkContextobjectat0x1025b8f90>Spark的核心概念SparkContext:Spark的核心概念RDDs:在Spark中,我們通過分布式集合(distributedcollections,也就是RDDs)來進行計算,這些分布式集合,并行的分布在整個集群中。RDDs是Spark分發(fā)數(shù)據(jù)和計算的基礎(chǔ)抽象類。用SparkContext創(chuàng)建RDDs上面例子中使用sc.textFile()創(chuàng)建了一個RDD,叫l(wèi)ines,它是從我們的本機文本文件中創(chuàng)建的,這個RDD代表了一個文本文件的每一行。我們可以在RDD上面進行各種并行化的操作,例如計算數(shù)據(jù)集中元素的個數(shù)或者打印出第一行。Spark的核心概念RDDs:Spark的核心概念向Spark傳遞函數(shù):向Spark傳遞函數(shù)是Spark的一個常用功能,許多SparkAPI是圍繞它展開的。例子:filteringscala>vallines=sc.textFile("../../testfile/helloSpark")lines:spark.RDD[String]=MappedRDD[...]scala>valworldLines=lines.filter(line=>line.contains("world"))pythonLines:spark.RDD[String]=FilteredRDD[...]scala>worldLines.collect()Spark的核心概念向Spark傳遞函數(shù):Spark的核心概念向Spark傳遞函數(shù):上面例子中的=>語法是Scala中定義函數(shù)的便捷方法。你也可以先定義函數(shù)再引用:例子:defhasWorld(line:String):Boolean={line.contains("world")}worldLines=lines.filter(hasWorld)像filter這樣的基于函數(shù)的操作,也是在集群上并行執(zhí)行的。Spark的核心概念向Spark傳遞函數(shù):Spark的核心概念向Spark傳遞函數(shù):需要注意的地方:如果你傳遞的函數(shù)是一個對象的成員,或者包含一個對象中字段的引用(例如self.field),Spark會把整個對象都發(fā)送到工作節(jié)點上,這樣會比僅僅發(fā)送你關(guān)心的信息要大很多,而且有時候會帶來一些奇怪的問題。傳送信息太多解決方法:我們可以把關(guān)心的字段抽取出來,只傳遞關(guān)心的字段。奇怪問題的避免:序列化包含函數(shù)的對象,函數(shù)和函數(shù)中引用的數(shù)據(jù)都需要序列化(實現(xiàn)Java的Serializableinterface)。如果Scala中出現(xiàn)NotSerializableException,一般情況下,都是因為沒序列化。Spark的核心概念向Spark傳遞函數(shù):RDDs介紹RDDs介紹RDDs的創(chuàng)建方法Scala的基礎(chǔ)知識RDDs介紹RDDs介紹RDDs介紹RDDsResilientdistributeddatasets(彈性分布式數(shù)據(jù)集,簡寫RDDs)。一個RDD就是一個不可改變的分布式集合對象,內(nèi)部由許多partitions(分片)組成,每個partition都包括一部分?jǐn)?shù)據(jù),這些partitions可以在集群的不同節(jié)點上計算Partitions是Spark中的并行處理的單元。Spark順序的,并行的處理partitions。RDDs是Spark的分發(fā)數(shù)據(jù)和計算的基礎(chǔ)抽象類,是Spark的核心概念。RDD可以包含Python,Java,或者Scala中的任何數(shù)據(jù)類型,包括用戶自定義的類。在Spark中,所有的計算都是通過RDDs的創(chuàng)建,轉(zhuǎn)換,操作完成的。RDD具有l(wèi)ineagegraph(血統(tǒng)關(guān)系圖)。RDDs介紹RDDsRDDs的創(chuàng)建方法Driverprogram中創(chuàng)建RDDs:把一個存在的集合傳給SparkContext’sparallelize()方法。這種方法,一般只適用于學(xué)習(xí)時。例子:vallines=sc.parallelize(List("spark","bigdatastudy"))valrdd=sc.parallelize(Array(1,2,2,4),4)...注意一下RDD的類型第一個參數(shù)是:待并行化處理的集合第二個參數(shù)是:分區(qū)個數(shù)RDDs的創(chuàng)建方法Driverprogram中創(chuàng)建RDDsRDDs的創(chuàng)建方法加載外部數(shù)據(jù)集:例子:使用textFile()加載valrddText=sc.textFile("../../testfile/helloSpark")valrddHdfs=sc.textFile("hdfs:///some/path.txt")RDDs的創(chuàng)建方法加載外部數(shù)據(jù)集:Scala的基礎(chǔ)知識Scala的變量聲明在Scala中創(chuàng)建變量的時候,必須使用val或者varVal,變量值不可修改,一旦分配不能重新指向別的值Var,分配后,可以指向類型相同的值。Scala的基礎(chǔ)知識Scala的變量聲明Scala的基礎(chǔ)知識Scala的變量聲明vallines=sc.textFile("../../testfile/helloSpark")lines=sc.textFile("../../testfile/helloSpark2")...<console>:error:reassignmenttovalvarlines2=sc.textFile("../../testfile/helloSpark")lines2=sc.textFile("../../testfile/helloSpark2")可以重新聲明變量vallines=sc.textFile("../../testfile/helloSpark2")Scala的基礎(chǔ)知識Scala的變量聲明Scala的基礎(chǔ)知識Scala的匿名函數(shù)像Python的lambda函數(shù)lines.filter(line=>line.contains("world"))...我們定義一個匿名函數(shù),接收一個參數(shù)line,并使用line這個String類型變量上的contains方法,并且返回結(jié)果。line的類型不需指定,能夠推斷出來Scala的基礎(chǔ)知識Scala的匿名函數(shù)Scala的基礎(chǔ)知識Scala程序員就是不喜歡多寫代碼。Scala允許我們用下劃線"_"來代表匿名函數(shù)中的參數(shù)。lines.filter(_.contains("world"))...Scala的基礎(chǔ)知識Scala程序員就是不喜歡多寫代碼。Scala的基礎(chǔ)知識類型推斷defhasWorld(line:String):Boolean={line.contains("world")}worldLines=lines.filter(hasWorld)Scala中定義函數(shù)用def,參數(shù)指定類型String,因為后面的contains方法就是用的String中的Contains方法。函數(shù)返回的類型,可以不必指定,因為通過類型推斷,能夠推出來。Scala的基礎(chǔ)知識類型推斷Scala的基礎(chǔ)知識類型推斷指定返回類型:返回的類型比較復(fù)雜,Scala可能推斷不出來。程序更易讀。Scala的基礎(chǔ)知識類型推斷TransformationsTransformations介紹逐元素transformations集合運算TransformationsTransformationsTransformations介紹Transformations(轉(zhuǎn)換):從之前的RDD構(gòu)建一個新的RDD,像map()和filter()。Transformations介紹TransformatioTransformations介紹Transformations的特點:Transformations返回一個嶄新的RDD,filter()操作返回一個指針,指向一個嶄新的RDD,原RDD不受影響,能夠在后面重復(fù)利用。Transformations介紹Transformatio逐元素transformations許多的transformations是逐元素的,也就是每次轉(zhuǎn)變一個元素。兩個最常用的transformations:map()andfilter()map()transformation,接收一個函數(shù),把這個函數(shù)應(yīng)用到RDD的每一個元素,并返一個函數(shù)作用后的新的RDD。filter()transformation,接收一個函數(shù),返回只包含滿足filter()函數(shù)的元素的新RDD。輸入RDD與輸出RDD可以是不同的類型,例如inputRDD[String],outputRDD[Double]逐元素transformations許多的transform逐元素transformationsmap()例子-對RDD中元素求平方valinput=sc.parallelize(List(1,2,3,4))valresult=input.map(x=>x*x)println(result.collect().mkString(","))逐元素transformationsmap()逐元素transformationsflatMap()對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中元素壓扁后返回一個新的RDD。例子-flatMap(),把一行字分割成多個元素vallines=sc.parallelize(List("helloworld","hi"))valwords=lines.flatMap(line=>line.split(""))words.first()//returns"hello"逐元素transformationsflatMap()逐元素transformationsflatMap()逐元素transformationsflatMap()集合運算RDDs支持?jǐn)?shù)學(xué)集合的計算,例如并集,交集計算。注意:進行計算的RDDs應(yīng)該是相同類型。money->monkey集合運算RDDs支持?jǐn)?shù)學(xué)集合的計算,例如并集,交集計算。注意集合運算distinct()是很耗時的操作,因為它需要通過網(wǎng)絡(luò),shuffle所有的數(shù)據(jù),以保證元素不重復(fù)。一般情況下,我們不用distinct()。union(other)會包含重復(fù)的元素。intersection(other)求交集。耗時操作,因為需要shufflesubtract(other)第一個RDD中存在,而不存在與第二個RDD的元素。需要shuffle。使用場景,機器學(xué)習(xí)中,移除訓(xùn)練集。集合運算distinct()是很耗時的操作,因為它需要通過網(wǎng)集合運算cartesian(other)非常耗時。使用場景:用戶相似性的時候集合運算cartesian(other)RDD的transformations基本的RDDtransformations:RDD包含{1,2,3,3}函數(shù)名功能例子結(jié)果map()對每個元素應(yīng)用函數(shù)rdd.map(x=>x+1){2,3,4,4}flatMap()壓扁,常用來抽取單詞rdd.flatMap(x=>x.to(3)){1,2,3,2,3,3,3}filter()過濾rdd.filter(x=>x!=1){2,3,3}distinct()去重rdd.distinct(){1,2,3}sample(withReplacement,fraction,[seed])對一個RDD取樣,是否進行替換rdd.sample(false,0.5)不確定RDD的transformations基本的RDDtranRDD的transformations兩個RDD的transformations:一個RDD包含{1,2,3},另一個RDD包含{3,4,5}函數(shù)名功能例子結(jié)果union()并集rdd.union(other){1,2,3,3,4,5}intersection()交集ersection(other){3}subtract()取存在第一個RDD,而不存在第二個RDD的元素(使用場景,機器學(xué)習(xí)中,移除訓(xùn)練集)rdd.subtract(other){1,2}cartesian()笛卡爾積rdd.cartesian(other){(1,3),(1,4),…(3,5)}RDD的transformations兩個RDD的tranActions在RDD上計算出來一個結(jié)果,把結(jié)果返回給driverprogram或者保存在外部文件系統(tǒng)上,像count()函數(shù)first()。count()返回元素的個數(shù)Actions在RDD上計算出來一個結(jié)果,把結(jié)果返回給driRDD的actions函數(shù)名功能例子結(jié)果collect()返回RDD的所有元素rdd.collect(){1,2,3,3}count()計數(shù)rdd.count()4countByValue()返回一個map,表示唯一元素出現(xiàn)的個數(shù)rdd.countByValue(){(1,1),(2,1),(3,2)}take(num)返回幾個元素rdd.take(2){1,2}top(num)返回前幾個元素rdd.top(2){3,3}takeOrdered(num)(ordering)返回基于提供的排序算法的前幾個元素rdd.takeOrdered(2)(myOrdering){3,3}takeSample(withReplacement,num,[seed])取樣例rdd.takeSample(false,1)不確定reduce(func)合并RDD中元素rdd.reduce((x,y)=>x+y)9fold(zero)(func)與reduce()相似提供zerovaluerdd.fold(0)((x,y)=>x+y)9aggregate(zeroValue)(seqOp,combOp)與fold()相似,返回不同類型rdd.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2))(9,4)foreach(func)對RDD的每個元素作用函數(shù),什么也不返回rdd.foreach(func)什么也沒有RDD的actions函數(shù)名功能例子結(jié)果collect()返A(chǔ)ctionsreduce()最常用的是reduce(),接收一個函數(shù),作用在RDD的兩個類型相同的元素上,返回一個類型相同的新元素。最常用的一個函數(shù)是加法。使用reduce()我們可以很簡單的實現(xiàn),RDD中元素的累加,計數(shù),和其它類型的聚集操作。例子-reduce()valsum=rdd.reduce((x,y)=>x+y)Actionsreduce()Actionsfold()與reduce()相似,類型相同但是,在每個分區(qū)的初始化調(diào)用的時候,多了個“zerovalue”“zerovalue”的特點,把它應(yīng)用在你的函數(shù)上,不管多少次,都不改變值(例如:+操作的0,*操作的1)。Actionsfold()Actionsaggregate()與fold()相似類型可以不同我們提供想要返回的“zerovalue”類型。第一個函數(shù),RDD中元素累加(每個節(jié)點只累加本地的結(jié)果)。第二個函數(shù),合并累加器(合并每個節(jié)點的結(jié)果)??梢允褂胊ggreate()計算RDD的平均值,而不使用map()和fold()結(jié)合的方法。Actionsaggregate()Actions例子-aggregate()valresult=input.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2))valavg=result._1/result._2.toDoubleActions例子-aggregate()Actionscollect()遍歷整個RDD,向driverprogram返回RDD的內(nèi)容一般測試時候使用,可以判斷與預(yù)測值是否一樣需要單機內(nèi)存能夠容納下(因為數(shù)據(jù)要拷貝給driver)大數(shù)據(jù)的時候,使用saveAsTextFile()action,saveAsSequenceFile()action等。Actionscollect()Actionstake(n)返回RDD的n個元素(同時嘗試訪問最少的partitions)。返回結(jié)果是無序的。一般測試時候使用Actionstake(n)Actionsforeach()計算RDD中的每個元素,但不返回到本地??梢耘浜蟨rintln()友好的打印出數(shù)據(jù)。Actionsforeach()Actions.foreach(println)風(fēng)格:把函數(shù)println當(dāng)作參數(shù)傳遞給函數(shù)foreach例子-計算bad的個數(shù)errorsRDD=inputRDD.filter(line.contains("error"))warningsRDD=inputRDD.filter(line.contains("warning"))badLinesRDD=errorsRDD.union(warningsRDD)println(badLinesRDD.count())badLinesRDD.take(1).foreach(println)//使用take()取前1個數(shù)據(jù)Actions.foreach(println)Actionstop()排序(根據(jù)RDD中數(shù)據(jù)的比較器)takeSample(withReplacement,num,seed)取樣例,是否需要替換值。countByValue()返回一個map,表示唯一元素出現(xiàn)的個數(shù)Actionstop()Spark簡介
Spark簡介Spark是什么Spark是一個快速且通用的集群計算平臺。集群計算把一臺電腦無法解決的問題,放到多臺電腦組成的集群上進行解決,這就是集群計算。Spark是什么Spark是一個快速且通用的集群計算平臺。Spark的特點Spark是快速的很多任務(wù)能夠秒級完成,對于一些特定的工作,Spark比Mapreduce快10-20倍。Spark擴充了流行的Mapreduce計算模型,使Spark更高效地支持更多類型的計算,包括交互式查詢,和流處理。速度快的另一個主要原因就是,能夠在內(nèi)存中計算。Spark的特點Spark是快速的Spark的特點Spark是通用的Spark的設(shè)計,容納了之前很多獨立的,分布式系統(tǒng)所擁有的功能。獨立的分布式系統(tǒng)包括:批處理,迭代式計算,交互查詢和流處理等。并且,由之前需要維護不同的集群,到現(xiàn)在只需要維護一個Spark集群。Spark的特點Spark是通用的Spark的特點Spark是高度開放的Spark提供了Python,Java,Scala,SQL的API和豐富的內(nèi)置庫。同時,Spark和其它的大數(shù)據(jù)工具整合的很好。尤其,Spark能夠運行在Hadoop集群上面,能夠訪問Hadoop數(shù)據(jù)。Spark的特點Spark是高度開放的Spark的組件Spark包括多個緊密集成的組件。Spark的組件Spark的組件緊密集成的優(yōu)點:如果Spark底層優(yōu)化了,那么基于Spark底層的組件,也得到了相應(yīng)的優(yōu)化。例如,Spark底層增加了一個優(yōu)化算法,那么Spark的SQL和機器學(xué)習(xí)包也會自動的優(yōu)化。緊密集成,節(jié)省了各個組件組合使用時的部署,測試等時間。當(dāng)向Spark增加新的組件時,其它的組件,可以立刻享用新組件的功能。無縫連接不同的處理模型。Spark的組件緊密集成的優(yōu)點:Spark的組件SparkCore:包含Spark的基本功能,包含任務(wù)調(diào)度,內(nèi)存管理,容錯機制等。SparkCore內(nèi)部定義了RDDs(resilientdistributeddatasets,彈性分布式數(shù)據(jù)集)。RDDs代表橫跨很多工作節(jié)點的數(shù)據(jù)集合,RDDs可以被并行的處理。SparkCore提供了很多APIs來創(chuàng)建和操作這些集合(RDDs)。Spark的組件SparkCore:Spark的組件SparkSQL:是Spark處理結(jié)構(gòu)化數(shù)據(jù)的庫。它支持通過SQL查詢數(shù)據(jù),就像HQL(HiveSQL)一樣,并且支持很多數(shù)據(jù)源,像Hive表,JSON等。SparkSQL是在Spark1.0版本中新加的。Shark是一種較老的基于Spark的SQL項目,它是基于Hive修改的,它現(xiàn)在已經(jīng)被SparkSQL替代了。Spark的組件SparkSQL:Spark的組件SparkStreaming:是實時數(shù)據(jù)流處理組件,類似Storm。SparkStreaming提供了API來操作實時流數(shù)據(jù)。Spark的組件SparkStreaming:Spark的組件MLlib:Spark有一個包含通用機器學(xué)習(xí)功能的包,就是MLlib(machinelearninglib)。MLlib包含了分類,聚類,回歸,協(xié)同過濾算法,還包括模型評估,和數(shù)據(jù)導(dǎo)入。它還提供了一些低級的機器學(xué)習(xí)原語,包括通用梯度下降優(yōu)化算法。MLlib提供的上面這些方法,都支持集群上的橫向擴展。Spark的組件MLlib:Spark的組件Graphx:是處理圖的庫(例如,社交網(wǎng)絡(luò)圖),并進行圖的并行計算。就像SparkStreaming和SparkSQL一樣,Graphx也繼承了SparkRDDAPI,同時允許創(chuàng)建有向圖。Graphx提供了各種圖的操作,例如subgraph和mapVertices,也包含了常用的圖算法,例如PangeRank等。Spark的組件Graphx:Spark的組件ClusterManagers:ClusterManagers就是集群管理。Spark能夠運行在很多clustermanagers上面,包括HadoopYARN,ApacheMesos和Spark自帶的單獨調(diào)度器。如果你把Spark安裝在了裸機上面,單獨調(diào)度器能夠提供簡單的方式,讓你開始Spark之旅。如果你已經(jīng)有了HadoopYarn或者Mesos集群,那么,Spark對這些集群管理工具的支持,使你的Spark應(yīng)用程序能夠在這些集群上面運行。Spark的組件ClusterManagers:Spark的歷史Spark誕生于2009年,那時候它是,加州大學(xué)伯克利分校RAD實驗室的一個研究項目,后來到了AMP實驗室。Spark最初是基于HadoopMapreduce的,后來發(fā)現(xiàn)Mapreduce在迭代式計算和交互式上是低效的。因此Spark進行了改進,引入了內(nèi)存存儲和高容錯機制。關(guān)于Spark的研究論文在學(xué)術(shù)會議上發(fā)表,并且在它被創(chuàng)建的2009年不久之后,對于一些特定的工作,Spark比Mapreduce快10-20倍。2010年3月份Spark開源。2011年,AMP實驗室開始在Spark上面開發(fā)高級組件,像Shark(HiveonSpark),SparkStreaming。2013年轉(zhuǎn)移到了Apache下,現(xiàn)在已經(jīng)是頂級項目了。2014年5月份Spark1.0發(fā)布。Spark的歷史Spark誕生于2009年,那時候它是,加州Spark運行環(huán)境Spark是Scala寫的,運行在JVM上。所以運行環(huán)境是Java6或者以上。如果想要使用PythonAPI,需要安裝Python解釋器2.6版本或者以上。目前Spark(1.2.0版本)與Python3不兼容。Spark運行環(huán)境Spark是Scala寫的,運行在JVSpark下載下載地址:/downloads.html,選擇Pre-builtforHadoop2.4andlater這個包,點擊直接下載,這會下載一個spark-1.2.0-bin-hadoop2.4.tgz的壓縮包搭建Spark不需要Hadoop,如果你有hadoop集群或者hdfs,你可以下載相應(yīng)的版本。解壓:tar-zxvfspark-1.2.0-bin-hadoop2.4.tgzSpark下載下載地址:http://spark.apachSpark目錄README.md 開始Spark之旅的簡單介紹。bin 包含用來和Spark交互的可執(zhí)行文件,如Sparkshell。core,streaming,python,… 包含主要組件的源代碼。examples 包含一些有用的單機Sparkjob。你可以研究和運行這些例子,來學(xué)習(xí)SparkAPI。Spark目錄README.mdSpark的ShellsSpark的shell使你能夠處理分布在集群上的數(shù)據(jù)(這些數(shù)據(jù)可以是分布在硬盤上或者內(nèi)存中)。Spark可以把數(shù)據(jù)加載到工作節(jié)點的內(nèi)存中,因此,許多分布式處理(甚至是分布式的1T數(shù)據(jù)的處理)都可以在幾秒內(nèi)完成。上面的特性,使迭代式計算,實時查詢、分析一般能夠在shells中完成。Spark提供了Pythonshells和Scalashells。Spark的ShellsSpark的shell使你能夠處理分Spark的Shells打開Spark的PythonShell:到Spark目錄,Spark的PythonShell也叫做PySparkShellbin/pyspark打開PySparkShell之后的界面Spark的Shells打開Spark的PythonSheSpark的Shells打開Spark的ScalaShell:到Spark目錄bin/pysparkbin/spark-shell打開Scala版本的shell打開之后的界面Spark的Shells打開Spark的ScalaShelSpark的Shells例子:scala>vallines=sc.textFile("../../testfile/helloSpark")//創(chuàng)建一個叫l(wèi)ines的RDDlines:org.apache.spark.rdd.RDD[String]=../../testfile/helloSparkMappedRDD[1]attextFileat<console>:12scala>lines.count()//對這個RDD中的行數(shù)進行計數(shù) res0:Long=2scala>lines.first()//文件中的第一行 res1:String=hellospark修改日志級別:conf/pertieslog4j.rootCategory=WARN,consoleSpark的Shells例子:Spark的核心概念Driverprogram:包含程序的main()方法,RDDs的定義和操作。(在上面的例子中,driverprogram就是SparkShell它本身了)它管理很多節(jié)點,我們稱作executors。count()操作解釋(每個executor計算文件的一部分,最后合并)。Spark的核心概念Driverprogram:Spark的核心概念SparkContext:Driverprograms通過一個SparkContext對象訪問Spark,SparkContext對象代表和一個集群的連接。在Shell中SparkContext自動創(chuàng)建好了,就是sc,例子:sc變量>>>sc<pyspark.context.SparkContextobjectat0x1025b8f90>Spark的核心概念SparkContext:Spark的核心概念RDDs:在Spark中,我們通過分布式集合(distributedcollections,也就是RDDs)來進行計算,這些分布式集合,并行的分布在整個集群中。RDDs是Spark分發(fā)數(shù)據(jù)和計算的基礎(chǔ)抽象類。用SparkContext創(chuàng)建RDDs上面例子中使用sc.textFile()創(chuàng)建了一個RDD,叫l(wèi)ines,它是從我們的本機文本文件中創(chuàng)建的,這個RDD代表了一個文本文件的每一行。我們可以在RDD上面進行各種并行化的操作,例如計算數(shù)據(jù)集中元素的個數(shù)或者打印出第一行。Spark的核心概念RDDs:Spark的核心概念向Spark傳遞函數(shù):向Spark傳遞函數(shù)是Spark的一個常用功能,許多SparkAPI是圍繞它展開的。例子:filteringscala>vallines=sc.textFile("../../testfile/helloSpark")lines:spark.RDD[String]=MappedRDD[...]scala>valworldLines=lines.filter(line=>line.contains("world"))pythonLines:spark.RDD[String]=FilteredRDD[...]scala>worldLines.collect()Spark的核心概念向Spark傳遞函數(shù):Spark的核心概念向Spark傳遞函數(shù):上面例子中的=>語法是Scala中定義函數(shù)的便捷方法。你也可以先定義函數(shù)再引用:例子:defhasWorld(line:String):Boolean={line.contains("world")}worldLines=lines.filter(hasWorld)像filter這樣的基于函數(shù)的操作,也是在集群上并行執(zhí)行的。Spark的核心概念向Spark傳遞函數(shù):Spark的核心概念向Spark傳遞函數(shù):需要注意的地方:如果你傳遞的函數(shù)是一個對象的成員,或者包含一個對象中字段的引用(例如self.field),Spark會把整個對象都發(fā)送到工作節(jié)點上,這樣會比僅僅發(fā)送你關(guān)心的信息要大很多,而且有時候會帶來一些奇怪的問題。傳送信息太多解決方法:我們可以把關(guān)心的字段抽取出來,只傳遞關(guān)心的字段。奇怪問題的避免:序列化包含函數(shù)的對象,函數(shù)和函數(shù)中引用的數(shù)據(jù)都需要序列化(實現(xiàn)Java的Serializableinterface)。如果Scala中出現(xiàn)NotSerializableException,一般情況下,都是因為沒序列化。Spark的核心概念向Spark傳遞函數(shù):RDDs介紹RDDs介紹RDDs的創(chuàng)建方法Scala的基礎(chǔ)知識RDDs介紹RDDs介紹RDDs介紹RDDsResilientdistributeddatasets(彈性分布式數(shù)據(jù)集,簡寫RDDs)。一個RDD就是一個不可改變的分布式集合對象,內(nèi)部由許多partitions(分片)組成,每個partition都包括一部分?jǐn)?shù)據(jù),這些partitions可以在集群的不同節(jié)點上計算Partitions是Spark中的并行處理的單元。Spark順序的,并行的處理partitions。RDDs是Spark的分發(fā)數(shù)據(jù)和計算的基礎(chǔ)抽象類,是Spark的核心概念。RDD可以包含Python,Java,或者Scala中的任何數(shù)據(jù)類型,包括用戶自定義的類。在Spark中,所有的計算都是通過RDDs的創(chuàng)建,轉(zhuǎn)換,操作完成的。RDD具有l(wèi)ineagegraph(血統(tǒng)關(guān)系圖)。RDDs介紹RDDsRDDs的創(chuàng)建方法Driverprogram中創(chuàng)建RDDs:把一個存在的集合傳給SparkContext’sparallelize()方法。這種方法,一般只適用于學(xué)習(xí)時。例子:vallines=sc.parallelize(List("spark","bigdatastudy"))valrdd=sc.parallelize(Array(1,2,2,4),4)...注意一下RDD的類型第一個參數(shù)是:待并行化處理的集合第二個參數(shù)是:分區(qū)個數(shù)RDDs的創(chuàng)建方法Driverprogram中創(chuàng)建RDDsRDDs的創(chuàng)建方法加載外部數(shù)據(jù)集:例子:使用textFile()加載valrddText=sc.textFile("../../testfile/helloSpark")valrddHdfs=sc.textFile("hdfs:///some/path.txt")RDDs的創(chuàng)建方法加載外部數(shù)據(jù)集:Scala的基礎(chǔ)知識Scala的變量聲明在Scala中創(chuàng)建變量的時候,必須使用val或者varVal,變量值不可修改,一旦分配不能重新指向別的值Var,分配后,可以指向類型相同的值。Scala的基礎(chǔ)知識Scala的變量聲明Scala的基礎(chǔ)知識Scala的變量聲明vallines=sc.textFile("../../testfile/helloSpark")lines=sc.textFile("../../testfile/helloSpark2")...<console>:error:reassignmenttovalvarlines2=sc.textFile("../../testfile/helloSpark")lines2=sc.textFile("../../testfile/helloSpark2")可以重新聲明變量vallines=sc.textFile("../../testfile/helloSpark2")Scala的基礎(chǔ)知識Scala的變量聲明Scala的基礎(chǔ)知識Scala的匿名函數(shù)像Python的lambda函數(shù)lines.filter(line=>line.contains("world"))...我們定義一個匿名函數(shù),接收一個參數(shù)line,并使用line這個String類型變量上的contains方法,并且返回結(jié)果。line的類型不需指定,能夠推斷出來Scala的基礎(chǔ)知識Scala的匿名函數(shù)Scala的基礎(chǔ)知識Scala程序員就是不喜歡多寫代碼。Scala允許我們用下劃線"_"來代表匿名函數(shù)中的參數(shù)。lines.filter(_.contains("world"))...Scala的基礎(chǔ)知識Scala程序員就是不喜歡多寫代碼。Scala的基礎(chǔ)知識類型推斷defhasWorld(line:String):Boolean={line.contains("world")}worldLines=lines.filter(hasWorld)Scala中定義函數(shù)用def,參數(shù)指定類型String,因為后面的contains方法就是用的String中的Contains方法。函數(shù)返回的類型,可以不必指定,因為通過類型推斷,能夠推出來。Scala的基礎(chǔ)知識類型推斷Scala的基礎(chǔ)知識類型推斷指定返回類型:返回的類型比較復(fù)雜,Scala可能推斷不出來。程序更易讀。Scala的基礎(chǔ)知識類型推斷TransformationsTransformations介紹逐元素transformations集合運算TransformationsTransformationsTransformations介紹Transformations(轉(zhuǎn)換):從之前的RDD構(gòu)建一個新的RDD,像map()和filter()。Transformations介紹TransformatioTransformations介紹Transformations的特點:Transformations返回一個嶄新的RDD,filter()操作返回一個指針,指向一個嶄新的RDD,原RDD不受影響,能夠在后面重復(fù)利用。Transformations介紹Transformatio逐元素transformations許多的transformations是逐元素的,也就是每次轉(zhuǎn)變一個元素。兩個最常用的transformations:map()andfilter()map()transformation,接收一個函數(shù),把這個函數(shù)應(yīng)用到RDD的每一個元素,并返一個函數(shù)作用后的新的RDD。filter()transformation,接收一個函數(shù),返回只包含滿足filter()函數(shù)的元素的新RDD。輸入RDD與輸出RDD可以是不同的類型,例如inputRDD[String],outputRDD[Double]逐元素transformations許多的transform逐元素transformationsmap()例子-對RDD中元素求平方valinput=sc.parallelize(List(1,2,3,4))valresult=input.map(x=>x*x)println(result.collect().mkString(","))逐元素transformationsmap()逐元素transformationsflatMap()對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中元素壓扁后返回一個新的RDD。例子-flatMap(),把一行字分割成多個元素vallines=sc.parallelize(List("helloworld","hi"))valwords=lines.flatMap(line=>line.split(""))words.first()//returns"hello"逐元素transformationsflatMap()逐元素transformationsflatMap()逐元素transformationsflatMap()集合運算RDDs支持?jǐn)?shù)學(xué)集合的計算,例如并集,交集計算。注意:進行計算的RDDs應(yīng)該是相同類型。money->monkey集合運算RDDs支持?jǐn)?shù)學(xué)集合的計算,例如并集,交集計算。注意集合運算distinct()是很耗時的操作,因為它需要通過網(wǎng)絡(luò),shuffle所有的數(shù)據(jù),以保證元素不重復(fù)。一般情況下,我們不用distinct()。union(other)會包含重復(fù)的元素。intersection(other)求交集。耗時操作,因為需要shufflesubtract(other)第一個RDD中存在,而不存在與第二個RDD的元素。需要shuffle。使用場景,機器學(xué)習(xí)中,移除訓(xùn)練集。集合運算distinct()是很耗時的操作,因為它需要通過網(wǎng)集合運算cartesian(other)非常耗時。使用場景:用戶相似性的時候集合運算cartesian(other)RDD的transformations基本的RDDtransformations:RDD包含{1,2,3,3}函數(shù)名功能例子結(jié)果map()對每個元素應(yīng)用函數(shù)rdd.map(x=>x+1){2,3,4,4}flatMap()壓扁,常用來抽取單詞rdd.flatMap(x=>x.to(3)){1,2,3,2,3,3,3}filter()過濾rdd.filter(x=>x!=1){2,3,3}distinct()去重rdd.distinct(){1,2,3}sample(withReplacement,fraction,[seed])對一個RDD取樣,是否進行替換rdd.sample(false,0.5)不確定RDD的transformations基本的RDDtranRDD的transformations兩個RDD的transformations:一個RDD包含{1,2,3},另一個RDD包含{3,4,5}函數(shù)名功能例子結(jié)果union()并集rdd.union(other){1,2,3,3,4,5}intersection()交集ersection(other){3}subtract()取存在第一個RDD,而不存在第二個RDD的元素(使用場景,機器學(xué)習(xí)中,移除訓(xùn)練集)rdd.subtract(other){1,2}cartesian()笛卡爾積rdd.cartesian(other){(1,3),(1,4),…(3,5)}RDD的transformations兩個RDD的tran
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 出口合同范本格式
- Unit 7 Be Wise with Money Period 3 Grammar 教學(xué)設(shè)計 2024-2025學(xué)年譯林版(2024)七年級英語上冊
- 勞務(wù)發(fā)包合同范本
- 動物投放景區(qū)合同范本
- 農(nóng)村菜田出租合同范本
- 出租養(yǎng)殖雞場合同范本
- 加工定制窗簾合同范本
- 保潔商場合同范本
- 包地收款合同范本
- 勞務(wù)中介代理招聘合同范本
- 2025年個體戶合伙投資協(xié)議(三篇)
- 14磁極與方向(教學(xué)設(shè)計)-二年級科學(xué)下冊(教科版)
- 2025年山西經(jīng)貿(mào)職業(yè)學(xué)院高職單招職業(yè)技能測試近5年??及鎱⒖碱}庫含答案解析
- 廣東省佛山市禪城區(qū)2024-2025學(xué)年八年級上學(xué)期期末考試語文試題(含答案)
- 第04課 輸入輸出與計算(說課稿)2024-2025學(xué)年六年級上冊信息技術(shù)人教版
- 部編五下語文教學(xué)多元評價方案
- 2024年09月江蘇2024年蘇州金融租賃校園招考筆試歷年參考題庫附帶答案詳解
- 《榜樣9》觀后感心得體會二
- 重慶市2024-205學(xué)年秋高二(上)期末考試歷史試卷(含答案)康德卷
- 廣西柳州市2025屆高三第二次模擬考試政治試題含答案
- 設(shè)備維修績效考核方案
評論
0/150
提交評論