5Spark各個知識點總結(jié)_第1頁
5Spark各個知識點總結(jié)_第2頁
5Spark各個知識點總結(jié)_第3頁
5Spark各個知識點總結(jié)_第4頁
5Spark各個知識點總結(jié)_第5頁
已閱讀5頁,還剩55頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

1、Spark簡介,Spark是什么,Spark是一個快速且通用的集群計算平臺。 集群計算 把一臺電腦無法解決的問題,放到多臺電腦組成的集群上進行解決,這就是集群計算。,Spark的特點,Spark是快速的 很多任務能夠秒級完成,對于一些特定的工作,Spark比Mapreduce快10-20倍。 Spark擴充了流行的Mapreduce計算模型,使Spark更高效地支持更多類型的計算,包括交互式查詢,和流處理。 速度快的另一個主要原因就是,能夠在內(nèi)存中計算。,Spark的特點,Spark是通用的 Spark的設計,容納了之前很多獨立的,分布式系統(tǒng)所擁有的功能。獨立的分布式系統(tǒng)包括:批處理,迭代式計

2、算,交互查詢和流處理等。 并且,由之前需要維護不同的集群,到現(xiàn)在只需要維護一個Spark集群。,Spark的特點,Spark是高度開放的 Spark提供了Python,Java,Scala,SQL的API和豐富的內(nèi)置庫。 同時,Spark和其它的大數(shù)據(jù)工具整合的很好。尤其,Spark能夠運行在Hadoop集群上面,能夠訪問Hadoop數(shù)據(jù)。,Spark的組件,Spark包括多個緊密集成的組件。,Spark的組件,緊密集成的優(yōu)點: 如果Spark底層優(yōu)化了,那么基于Spark底層的組件,也得到了相應的優(yōu)化。例如,Spark底層增加了一個優(yōu)化算法,那么Spark的SQL和機器學習包也會自動的優(yōu)化。

3、 緊密集成,節(jié)省了各個組件組合使用時的部署,測試等時間。 當向Spark增加新的組件時,其它的組件,可以立刻享用新組件的功能。 無縫連接不同的處理模型。,Spark的組件,Spark Core: 包含Spark的基本功能,包含任務調(diào)度,內(nèi)存管理,容錯機制等。 Spark Core內(nèi)部定義了RDDs(resilient distributed datasets,彈性分布式數(shù)據(jù)集)。RDDs代表橫跨很多工作節(jié)點的數(shù)據(jù)集合,RDDs可以被并行的處理。 Spark Core提供了很多APIs來創(chuàng)建和操作這些集合(RDDs)。,Spark的組件,Spark SQL: 是Spark處理結(jié)構(gòu)化數(shù)據(jù)的庫。它支

4、持通過SQL查詢數(shù)據(jù),就像HQL(Hive SQL)一樣,并且支持很多數(shù)據(jù)源,像Hive表,JSON等。Spark SQL是在Spark 1.0版本中新加的。 Shark是一種較老的基于Spark的SQL項目,它是基于Hive修改的,它現(xiàn)在已經(jīng)被Spark SQL替代了。,Spark的組件,Spark Streaming: 是實時數(shù)據(jù)流處理組件,類似Storm。 Spark Streaming提供了API來操作實時流數(shù)據(jù)。,Spark的組件,MLlib: Spark有一個包含通用機器學習功能的包,就是MLlib(machine learning lib)。 MLlib包含了分類,聚類,回歸,協(xié)

5、同過濾算法,還包括模型評估,和數(shù)據(jù)導入。 它還提供了一些低級的機器學習原語,包括通用梯度下降優(yōu)化算法。 MLlib提供的上面這些方法,都支持集群上的橫向擴展。,Spark的組件,Graphx: 是處理圖的庫(例如,社交網(wǎng)絡圖),并進行圖的并行計算。就像Spark Streaming和Spark SQL一樣,Graphx也繼承了Spark RDD API,同時允許創(chuàng)建有向圖。 Graphx提供了各種圖的操作,例如subgraph和mapVertices,也包含了常用的圖算法,例如PangeRank等。,Spark的組件,Cluster Managers: Cluster Managers就是集群

6、管理。Spark能夠運行在很多cluster managers上面,包括Hadoop YARN,Apache Mesos和Spark自帶的單獨調(diào)度器。 如果你把Spark安裝在了裸機上面,單獨調(diào)度器能夠提供簡單的方式,讓你開始Spark之旅。 如果你已經(jīng)有了Hadoop Yarn或者Mesos集群,那么,Spark對這些集群管理工具的支持,使你的Spark應用程序能夠在這些集群上面運行。,Spark的歷史,Spark誕生于2009年,那時候它是,加州大學伯克利分校RAD實驗室的一個研究項目,后來到了AMP實驗室。 Spark最初是基于Hadoop Mapreduce的,后來發(fā)現(xiàn)Mapreduc

7、e在迭代式計算和交互式上是低效的。因此Spark進行了改進,引入了內(nèi)存存儲和高容錯機制。 關于Spark的研究論文在學術會議上發(fā)表,并且在它被創(chuàng)建的2009年不久之后,對于一些特定的工作,Spark比Mapreduce快10-20倍。 2010年3月份Spark開源。 2011年,AMP實驗室開始在Spark上面開發(fā)高級組件,像Shark(Hive on Spark),Spark Streaming。 2013年轉(zhuǎn)移到了Apache下,現(xiàn)在已經(jīng)是頂級項目了。 2014年5月份Spark1.0發(fā)布。,Spark運行環(huán)境,Spark 是Scala寫的, 運行在JVM上。所以運行環(huán)境是Java6或者

8、以上。 如果想要使用 Python API,需要安裝Python 解釋器2.6版本或者以上。 目前Spark(1.2.0版本) 與Python 3不兼容。,Spark下載,下載地址:/downloads.html,選擇Pre-built for Hadoop 2.4 and later 這個包,點擊直接下載,這會下載一個spark-1.2.0-bin-hadoop2.4.tgz的壓縮包 搭建Spark不需要Hadoop,如果你有hadoop集群或者hdfs,你可以下載相應的版本。 解壓:tar -zxvf spark-1.2.0-bin-hadoop

9、2.4.tgz,Spark目錄,README.md 開始Spark之旅的簡單介紹。 bin 包含用來和Spark交互的可執(zhí)行文件,如Spark shell。 core, streaming, python, 包含主要組件的源代碼。 examples 包含一些有用的單機Spark job。 你可以研究和運行這些例子,來學習Spark API。,Spark的Shells,Spark的shell使你能夠處理分布在集群上的數(shù)據(jù)(這些數(shù)據(jù)可以是分布在硬盤上或者內(nèi)存中)。 Spark可以把數(shù)據(jù)加載到工作節(jié)點的內(nèi)存中,因此,許多分布式處理(甚至是分布式的1T數(shù)據(jù)的處理)都可以在幾秒內(nèi)完成。 上面的特性,使迭

10、代式計算,實時查詢、分析一般能夠在shells中完成。Spark提供了Python shells和 Scala shells。,Spark的Shells,打開Spark的Python Shell: 到Spark目錄,Spark的Python Shell也叫做PySpark Shell bin/pyspark 打開PySpark Shell之后的界面,Spark的Shells,打開Spark的Scala Shell: 到Spark目錄 bin/pysparkbin/spark-shell打開Scala版本的shell 打開之后的界面,Spark的Shells,例子: scala val line

11、s = sc.textFile(././testfile/helloSpark) / 創(chuàng)建一個叫l(wèi)ines的RDD lines: org.apache.spark.rdd.RDDString = ././testfile/helloSpark MappedRDD1 at textFile at :12 scala lines.count() / 對這個RDD中的行數(shù)進行計數(shù) res0: Long = 2 scala lines.first() / 文件中的第一行 res1: String = hello spark 修改日志級別:conf/perties log4j.root

12、Category=WARN, console,Spark的核心概念,Driver program: 包含程序的main()方法,RDDs的定義和操作。(在上面的例子中,driver program就是Spark Shell它本身了) 它管理很多節(jié)點,我們稱作executors。 count()操作解釋(每個executor計算文件的一部分,最后合并)。,Spark的核心概念,SparkContext: Driver programs 通過一個 SparkContext 對象訪問 Spark,SparkContext 對象代表和一個集群的連接。 在Shell中SparkContext 自動創(chuàng)建好

13、了,就是sc, 例子: sc 變量 sc ,Spark的核心概念,RDDs: 在Spark中,我們通過分布式集合(distributed collections,也就是RDDs)來進行計算,這些分布式集合,并行的分布在整個集群中。 RDDs 是 Spark分發(fā)數(shù)據(jù)和計算的基礎抽象類。 用SparkContext創(chuàng)建RDDs 上面例子中使用sc.textFile()創(chuàng)建了一個RDD,叫l(wèi)ines,它是從我們的本機文本文件中創(chuàng)建的,這個RDD代表了一個文本文件的每一行。我們可以在RDD上面進行各種并行化的操作,例如計算數(shù)據(jù)集中元素的個數(shù)或者打印出第一行。,Spark的核心概念,向Spark傳遞函數(shù)

14、: 向Spark傳遞函數(shù)是Spark的一個常用功能,許多Spark API是圍繞它展開的。 例子:filtering scala val lines = sc.textFile(././testfile/helloSpark) lines: spark.RDDString = MappedRDD. scala val worldLines = lines.filter(line = line.contains(world) pythonLines: spark.RDDString = FilteredRDD. scala worldLines .collect(),Spark的核心概念,向Sp

15、ark傳遞函數(shù): 上面例子中的=語法是 Scala中定義函數(shù)的便捷方法。你也可以先定義函數(shù)再引用: 例子: def hasWorld(line:String) : Boolean= line.contains(world) worldLines = lines.filter(hasWorld) 像filter 這樣的基于函數(shù)的操作,也是在集群上并行執(zhí)行的。,Spark的核心概念,向Spark傳遞函數(shù): 需要注意的地方: 如果你傳遞的函數(shù)是一個對象的成員,或者包含一個對象中字段的引用(例如self.field),Spark會把整個對象都發(fā)送到工作節(jié)點上,這樣會比僅僅發(fā)送你關心的信息要大很多,而且

16、有時候會帶來一些奇怪的問題。 傳送信息太多解決方法:我們可以把關心的字段抽取出來,只傳遞關心的字段。 奇怪問題的避免:序列化包含函數(shù)的對象,函數(shù)和函數(shù)中引用的數(shù)據(jù)都需要序列化(實現(xiàn)Java的Serializable interface)。 如果Scala中出現(xiàn)NotSerializableException,一般情況下,都是因為沒序列化。,RDDs介紹,RDDs介紹 RDDs的創(chuàng)建方法 Scala的基礎知識,RDDs介紹,RDDs Resilient distributed datasets(彈性分布式數(shù)據(jù)集,簡寫RDDs)。 一個RDD就是一個不可改變的分布式集合對象,內(nèi)部由許多partit

17、ions(分片)組成,每個partition都包括一部分數(shù)據(jù),這些partitions可以在集群的不同節(jié)點上計算 Partitions是Spark中的并行處理的單元。Spark順序的,并行的處理partitions。 RDDs 是 Spark的分發(fā)數(shù)據(jù)和計算的基礎抽象類,是Spark的核心概念。 RDD可以包含 Python, Java, 或者 Scala中的任何數(shù)據(jù)類型,包括用戶自定義的類。 在Spark中,所有的計算都是通過RDDs的創(chuàng)建,轉(zhuǎn)換,操作完成的。 RDD具有l(wèi)ineage graph(血統(tǒng)關系圖)。,RDDs的創(chuàng)建方法,Driver program中創(chuàng)建RDDs: 把一個存在的

18、集合傳給SparkContexts parallelize()方法。這種方法,一般只適用于學習時。 例子: val lines = sc.parallelize(List(spark, bigdatastudy) val rdd = sc.parallelize(Array(1, 2, 2, 4), 4) . 注意一下RDD的類型 第一個參數(shù)是:待并行化處理的集合 第二個參數(shù)是:分區(qū)個數(shù),RDDs的創(chuàng)建方法,加載外部數(shù)據(jù)集: 例子:使用textFile() 加載 val rddText= sc.textFile(././testfile/helloSpark) val rddHdfs = sc

19、.textFile(hdfs:/some/path.txt),Scala的基礎知識,Scala的變量聲明 在Scala中創(chuàng)建變量的時候,必須使用val或者var Val,變量值不可修改,一旦分配不能重新指向別的值 Var,分配后,可以指向類型相同的值。,Scala的基礎知識,Scala的變量聲明 val lines= sc.textFile(././testfile/helloSpark) lines= sc.textFile(././testfile/helloSpark2) . : error: reassignment to val var lines2= sc.textFile(./.

20、/testfile/helloSpark) lines2= sc.textFile(././testfile/helloSpark2) 可以重新聲明變量 val lines= sc.textFile(././testfile/helloSpark2),Scala的基礎知識,Scala的匿名函數(shù) 像Python的lambda 函數(shù) lines.filter(line = line.contains(world) . 我們定義一個匿名函數(shù),接收一個參數(shù)line,并使用line這個String類型變量上的contains方法,并且返回結(jié)果。 line 的類型不需指定,能夠推斷出來,Scala的基礎知

21、識,Scala程序員就是不喜歡多寫代碼。 Scala允許我們用下劃線_來代表匿名函數(shù)中的參數(shù)。 lines.filter(_.contains(world) .,Scala的基礎知識,類型推斷 def hasWorld(line:String) : Boolean=line.contains(world) worldLines = lines.filter(hasWorld) Scala中定義函數(shù)用def,參數(shù)指定類型String,因為后面的contains方法就是用的String中的Contains方法。 函數(shù)返回的類型,可以不必指定,因為通過類型推斷,能夠推出來。,Scala的基礎知識,類

22、型推斷 指定返回類型: 返回的類型比較復雜,Scala可能推斷不出來。 程序更易讀。,Transformations,Transformations介紹 逐元素transformations 集合運算,Transformations介紹,Transformations(轉(zhuǎn)換): 從之前的RDD構(gòu)建一個新的RDD,像map() 和 filter()。,Transformations介紹,Transformations的特點: Transformations返回一個嶄新的RDD, filter() 操作返回一個指針,指向一個嶄新的RDD,原RDD不受影響,能夠在后面重復利用。,逐元素transfo

23、rmations,許多的transformations是逐元素的,也就是每次轉(zhuǎn)變一個元素。 兩個最常用的transformations:map() and filter() map() transformation,接收一個函數(shù),把這個函數(shù)應用到RDD的每一個元素,并返一個函數(shù)作用后的新的RDD。 filter() transformation,接收一個函數(shù),返回只包含滿足filter()函數(shù)的元素的新RDD。 輸入RDD與輸出RDD可以是不同的類型,例如input RDDString ,output RDDDouble,逐元素transformations,map() 例子- 對RDD中元素

24、求平方 val input = sc.parallelize(List(1, 2, 3, 4) val result = input.map(x = x * x) println(result.collect().mkString(,),逐元素transformations,flatMap() 對每個輸入元素,輸出多個輸出元素。 flat壓扁的意思,將RDD中元素壓扁后返回一個新的RDD。 例子- flatMap() ,把一行字分割成多個元素 val lines = sc.parallelize(List(hello world, hi) val words = lines.flatMap(l

25、ine = line.split( ) words.first() / returns hello,逐元素transformations,flatMap(),集合運算,RDDs支持數(shù)學集合的計算,例如并集,交集計算。注意:進行計算的RDDs應該是相同類型。 money-monkey,集合運算,distinct()是很耗時的操作,因為它需要通過網(wǎng)絡,shuffle所有的數(shù)據(jù),以保證元素不重復。 一般情況下,我們不用distinct()。 union(other) 會包含重復的元素。 intersection(other)求交集。耗時操作,因為需要shuffle subtract(other)第一

26、個RDD中存在,而不存在與第二個RDD的元素。需要shuffle。使用場景,機器學習中,移除訓練集。,集合運算,cartesian(other) 非常耗時。 使用場景:用戶相似性的時候,RDD的transformations,基本的RDD transformations: RDD 包含 1, 2, 3, 3,RDD的transformations,兩個RDD 的transformations: 一個RDD包含 1, 2, 3,另一個RDD包含 3, 4, 5,Actions,在RDD上計算出來一個結(jié)果,把結(jié)果返回給driver program或者保存在外部文件系統(tǒng)上,像count() 函數(shù) f

27、irst()。 count() 返回元素的個數(shù),RDD的actions,Actions,reduce() 最常用的是reduce(),接收一個函數(shù),作用在RDD的兩個類型相同的元素上,返回一個類型相同的新元素。 最常用的一個函數(shù)是加法。 使用reduce()我們可以很簡單的實現(xiàn),RDD中元素的累加,計數(shù),和其它類型的聚集操作。 例子- reduce() val sum = rdd.reduce(x, y) = x + y),Actions,fold() 與reduce()相似, 類型相同 但是,在每個分區(qū)的初始化調(diào)用的時候,多了個“zero value” “zero value”的特點,把它應用在你的函數(shù)上,不管多少次,都不改變值(例如:+操作的0,*操作的1)。,Actions,aggregate() 與fold()相似 類型可以不同 我們提供想要返回的“zero value”類型。 第一個函數(shù),RDD中元素累加(每個節(jié)點只累加本地的結(jié)果)。 第二個函數(shù),合并累加器(合并每個節(jié)點的結(jié)果)。 可以使用ag

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論