Spark快速大數(shù)據(jù)分析_第1頁(yè)
Spark快速大數(shù)據(jù)分析_第2頁(yè)
Spark快速大數(shù)據(jù)分析_第3頁(yè)
Spark快速大數(shù)據(jù)分析_第4頁(yè)
Spark快速大數(shù)據(jù)分析_第5頁(yè)
已閱讀5頁(yè),還剩252頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Spark

懶嘀斤

目錄

第1章Spark數(shù)據(jù)分析導(dǎo)論

1.1Spark是什么

1.2一個(gè)大一統(tǒng)的軟件棧

1.2.1SparkCore

1.2.2SparkSQL

123SparkStreaming

1.2.4MLlib

1.2.5GraphX

126集群管理器

1.3Spark的用戶(hù)和用途

1.3.1數(shù)據(jù)科學(xué)任務(wù)

1.3.2數(shù)據(jù)處理應(yīng)用

1.4Spark簡(jiǎn)史

1.5Spark的版本和發(fā)布

1.6Spark的存儲(chǔ)層次

第2章Spark下載與入門(mén)

2.1下載Spark

2.2Spark中Python和Scala的shell

2.3Spark核心概念簡(jiǎn)介

2.4獨(dú)立應(yīng)用

2.4.1初始化SparkContext

2.4.2構(gòu)建獨(dú)立應(yīng)用

2.5總結(jié)

第3章RDD編程

3.1RDD基礎(chǔ)

3.2創(chuàng)建RDD

3.3RDD操作

3.3.1轉(zhuǎn)化操作

3.3.2行動(dòng)操作

333惰性求值

3.4向Spark傳遞函數(shù)

3.4.1Python

3.4.2Scala

3.4.3lava

3.5常見(jiàn)的轉(zhuǎn)化操作和行動(dòng)操作

3.5.1基本RDD

3.5.2在不同RDD類(lèi)型間轉(zhuǎn)換

3.6持久化f緩存)

3.7總結(jié)

第4章鍵值對(duì)操作

4.1動(dòng)機(jī)

4.2創(chuàng)建PairRDD

4.3PairRDD的轉(zhuǎn)化操作

4.3.1聚合操作

4.3.2數(shù)據(jù)分組

4.3.3連接

4.3.4數(shù)據(jù)排序

4.4PairRDD的行動(dòng)操作

4.5數(shù)據(jù)分區(qū)(進(jìn)階)

451獲取RDD的分區(qū)方式

4.5.2從分區(qū)中獲益的操作

4.5.3影響分區(qū)方式的操作

4.5.4示例:PageRank

4.5.5自定義分區(qū)方式

4.6總結(jié)

第5章數(shù)據(jù)讀取與保存

5.1動(dòng)機(jī)

5.2文件格式

5.2.1文本文件

5.2.2JSON

523逗號(hào)分隔值與制表符分隔值

524SequenceFile

5.2.5對(duì)象文件

526HadooD輸入輸出格式

5.2.7文件壓縮

5.3文件系統(tǒng)

5.3.1本地/“常規(guī)"文件系統(tǒng)

5.3.2AmazonS3

5.3.3HDFS

5.4SparkSQL中的結(jié)構(gòu)化數(shù)據(jù)

5.4.1ApacheHive

5.4.2JSON

5.5數(shù)據(jù)庫(kù)

5.5.1lava數(shù)據(jù)庫(kù)連接

5.5.2Cassandra

5.5.3HBase

5.5.4Elasticsearch

5.6總結(jié)

第6章Spark編程進(jìn)階

6.1簡(jiǎn)介

6.2累加器

621累加器與容錯(cuò)性

6.2.2自定義累加器

6.3廣播變量

廣播的優(yōu)化

6.4基于分區(qū)進(jìn)行操作

6.5與外部程序間的管道

6.6數(shù)值RDD的操作

6.7總結(jié)

第7章在集群上運(yùn)行Spark

7.1簡(jiǎn)介

7.2Spark運(yùn)行時(shí)架構(gòu)

721驅(qū)動(dòng)器節(jié)點(diǎn)

722執(zhí)行器節(jié)點(diǎn)

723集群管理器

724啟動(dòng)一個(gè)程序

725小結(jié)

7.3使用spark-submit部署應(yīng)用

7.4打包代碼與依賴(lài)

7.4.1使用Maven構(gòu)建的用Java編寫(xiě)的Spark應(yīng)用

7.4.2使用sbt構(gòu)建的用Scala編寫(xiě)的Spark應(yīng)用

7.4.3依賴(lài)沖突

7.5Spark應(yīng)用內(nèi)與應(yīng)用間調(diào)度

7.6集群管理器

7.6.1獨(dú)立集群管理器

7.6.2HadooDYARN

7.6.3ApacheMesos

7.6.4AmazonEC2

7.7選擇合適的集群管理器

7.8總結(jié)

第8章Spark調(diào)優(yōu)與調(diào)試

8.1使用SparkConf酉)置Spark

8.2Spark執(zhí)行的組成部分:作業(yè)、任務(wù)和步驟

8.3查找信息

8.3.1Spark網(wǎng)頁(yè)用戶(hù)界面

8.3.2驅(qū)動(dòng)器進(jìn)程和執(zhí)行器進(jìn)程的口志

8.4關(guān)鍵性能考量

8.4.1并行度

8.4.2序列化格式

8.4.3內(nèi)存管理

8.4.4硬件供給

8.5總結(jié)

第9章SparkSQL

9.1連接SparkSOL

9.2在應(yīng)用中使用SparkSOL

9.2.1初始化SparkSOL

9.2.2基本查詢(xún)示例

9.2.3SchemaRDD

9.2.4緩存

9.3讀取和存儲(chǔ)數(shù)據(jù)

9.3.1ApacheHive

9.3.2Parquet

9.3.3JSON

9.3.4基于RDD

9.4IDBC/0DBC月艮務(wù)器

9.4.1使用Beeline

9.4.2長(zhǎng)生命周期的表與查詢(xún)

9.5用戶(hù)自定義函數(shù)

9.5.1SparkSQLUDF

952HiveUDF

9.6SparkSOL性能

性能調(diào)優(yōu)選項(xiàng)

9.7總結(jié)

第10章SparkStreaming

10.1一個(gè)簡(jiǎn)單的例子

10.2架構(gòu)與抽象

10.3轉(zhuǎn)化操作

10.3.1無(wú)狀態(tài)轉(zhuǎn)化操作

10.3.2有狀態(tài)轉(zhuǎn)化操作

10.4輸出操作

10.5輸入源

10.5.1核心數(shù)據(jù)源

10.5.2附加數(shù)據(jù)源

1053多數(shù)據(jù)源與集群規(guī)模

10,624/7不間斷運(yùn)行

1061檢查點(diǎn)機(jī)制

10.6.2驅(qū)動(dòng)器程序容錯(cuò)

10.6.3工作節(jié)點(diǎn)容錯(cuò)

10.6.4接收器容錯(cuò)

10.6.5處理保證

10.7Streaming用戶(hù)界面

10.8性能考量

10.8.1批次和窗口大小

10.8.2并行度

10.8.3垃圾回收和內(nèi)存使用

10.9總結(jié)

第11章基于MLHb的機(jī)器學(xué)習(xí)

11.1概述

11.2系統(tǒng)要求

11.3機(jī)器學(xué)習(xí)基礎(chǔ)

示例:垃圾郵件分類(lèi)

11.4數(shù)據(jù)類(lèi)型

操作向量

11.5算法

11.5.1特征提取

11.5.2統(tǒng)計(jì)

1153分類(lèi)與回歸

11.5.4聚類(lèi)

1155協(xié)同過(guò)濾與推薦

1156降維

1157模型評(píng)估

11.6一些提示與性能考量

11.6.1準(zhǔn)備特征

1162配置算法

11.6.3緩存RDD以重復(fù)使用

11.6.4識(shí)別稀疏程度

11.6.5并行度

11.7流水線API

11.8總結(jié)

第1章Spark數(shù)據(jù)分析導(dǎo)論

本章會(huì)從宏觀角度介紹Spark到底是什么。如果你已經(jīng)對(duì)Spark和相關(guān)組件有一定了解,

你可以選擇直接從第2章開(kāi)始讀。

1.1Spark是什么

Spark是一個(gè)用來(lái)實(shí)現(xiàn)快速而通用的集群計(jì)算的平臺(tái)。

在速度方面,Spark擴(kuò)展了廣泛使用的MapReduce計(jì)算模型,而且高效地支持更多計(jì)算

模式,包括交互式查詢(xún)和流處理。在處理大規(guī)模數(shù)據(jù)集時(shí).,速度是非常重要的。速度快就

意味著我們可以進(jìn)行交互式的數(shù)據(jù)操作,否則我們每次操作就需要等待數(shù)分鐘甚至數(shù)小時(shí)。

Spark的一個(gè)主要特點(diǎn)就是能夠在內(nèi)存中進(jìn)行計(jì)算,因而更快。不過(guò)即使是必須在磁盤(pán)上

進(jìn)行的復(fù)雜計(jì)算,Spark依然比MapReduce更加高效。

總的來(lái)說(shuō),Spark適用于各種各樣原先需要多種不同的分布式平臺(tái)的場(chǎng)景,包括批處理、

迭代算法、交互式查詢(xún)、流處理。通過(guò)在一個(gè)統(tǒng)一的框架下支持這些不同的計(jì)算,Spark

使我們可以簡(jiǎn)單而低耗地把各種處理流程整合在一起。而這樣的組合,在實(shí)際的數(shù)據(jù)分

析過(guò)程中是很有意義的。不僅如此,Spark的這種特性還大大減輕了原先需要對(duì)各種平臺(tái)

分別管理的負(fù)擔(dān)。

Spark所提供的接口非常豐富。除了提供基于Python、Java、Scala和SQL的簡(jiǎn)單易用的

API以及內(nèi)建的豐富的程序庫(kù)以外,Spark還能和其他大數(shù)據(jù)工具密切配合使用。例如,

Spark可以運(yùn)行在Hadoop集群上,訪問(wèn)包括Cassandra在內(nèi)的任意Hadoop數(shù)據(jù)源。

1.2一個(gè)大一統(tǒng)的軟件棧

Spark項(xiàng)目包含多個(gè)緊密集成的組件。Spark的核心是一個(gè)對(duì)由很多計(jì)算任務(wù)組成的、運(yùn)

行在多個(gè)工作機(jī)器或者是一個(gè)計(jì)算集群上的應(yīng)用進(jìn)行調(diào)度、分發(fā)以及監(jiān)控的計(jì)算引擎。

由于Spark的核心引擎有著速度快和通用的特點(diǎn),因此Spark還支持為各種不同應(yīng)用場(chǎng)景

專(zhuān)門(mén)設(shè)計(jì)的高級(jí)組件,比如SQL和機(jī)器學(xué)習(xí)等。這些組件關(guān)系密切并且可以相互調(diào)用,

這樣你就可以像在平常軟件項(xiàng)目中使用程序庫(kù)那樣,組合使用這些的組件。

各組件間密切結(jié)合的設(shè)計(jì)原理有這樣幾個(gè)優(yōu)點(diǎn)。首先,軟件棧中所有的程序庫(kù)和高級(jí)組件

都可以從下層的改進(jìn)中獲益。比如,當(dāng)Spark的核心引擎新引入了一個(gè)優(yōu)化時(shí),SQL和機(jī)

器學(xué)習(xí)程序庫(kù)也都能自動(dòng)獲得性能提升。其次,運(yùn)行整個(gè)軟件棧的代價(jià)變小了。不需要運(yùn)

行5到10套獨(dú)立的軟件系統(tǒng)了,一個(gè)機(jī)構(gòu)只需要運(yùn)行一套軟件系統(tǒng)即可。這些代價(jià)包括

系統(tǒng)的部署、維護(hù)、測(cè)試、支持等。這也意味著Spark軟件棧中每增加一個(gè)新的組件,使

用Spark的機(jī)構(gòu)都能馬上試用新加入的組件。這就把原先嘗試一種新的數(shù)據(jù)分析系統(tǒng)所需

要的下載、部署并學(xué)習(xí)一個(gè)新的軟件項(xiàng)目的代價(jià)簡(jiǎn)化成了只需要升級(jí)Spark。

最后,密切結(jié)合的原理的一大優(yōu)點(diǎn)就是,我們能夠構(gòu)建出無(wú)縫整合不同處理模型的應(yīng)用。

例如,利用Spark,你可以在一個(gè)應(yīng)用中實(shí)現(xiàn)將數(shù)據(jù)流中的數(shù)據(jù)使用機(jī)器學(xué)習(xí)算法進(jìn)行實(shí)

時(shí)分類(lèi)。與此同時(shí)?,數(shù)據(jù)分析師也可以通過(guò)SQL實(shí)時(shí)查詢(xún)結(jié)果數(shù)據(jù),比如將數(shù)據(jù)與非結(jié)

構(gòu)化的日志文件進(jìn)行連接操作。不僅如此,有經(jīng)驗(yàn)的數(shù)據(jù)工程師和數(shù)據(jù)科學(xué)家還可以通過(guò)

Pythonshell來(lái)訪問(wèn)這些數(shù)據(jù),進(jìn)行即時(shí)分析。其他人也可以通過(guò)獨(dú)立的批處理應(yīng)用訪問(wèn)

這些數(shù)據(jù)。IT團(tuán)隊(duì)始終只需要維護(hù)一套系統(tǒng)即可。

Spark的各個(gè)組件如圖1-1所示,下面來(lái)依次簡(jiǎn)要介紹它們。

圖1-1:Spark軟件棧

1.2.1SparkCore

SparkCore實(shí)現(xiàn)了Spark的基本功能,包含任務(wù)調(diào)度、內(nèi)存管理、錯(cuò)誤恢復(fù)、與存儲(chǔ)系統(tǒng)

交互等模塊。SparkCore中還包含了對(duì)彈性分布式數(shù)據(jù)集(resilientdistributeddataset,

簡(jiǎn)稱(chēng)RDD)的API定義。RDD表示分布在多個(gè)計(jì)算節(jié)點(diǎn)上可以并行操作的元素集合,是

Spark主要的編程抽象。SparkCore提供了創(chuàng)建和操作這些集合的多個(gè)API。

1.2.2SparkSQL

SparkSQL是Spark用來(lái)操作結(jié)構(gòu)化數(shù)據(jù)的程序包。通過(guò)SparkSQL,我們可以使用SQL

或者ApacheHive版本的SQL方言(HQL)來(lái)查詢(xún)數(shù)據(jù)。SparkSQL支持多種數(shù)據(jù)源,比

如Hive表、Parquet以及JSON等。除了為Spark提供了一個(gè)SQL接口,SparkSQL還支

持開(kāi)發(fā)者將SQL和傳統(tǒng)的RDD編程的數(shù)據(jù)操作方式相結(jié)合,不論是使用Python、Java還

是Scala,開(kāi)發(fā)者都可以在單個(gè)的應(yīng)用中同時(shí)使用SQL和復(fù)雜的數(shù)據(jù)分析。通過(guò)與Spark

所提供的豐富的計(jì)算環(huán)境進(jìn)行如此緊密的結(jié)合,SparkSQL得以從其他開(kāi)源數(shù)據(jù)倉(cāng)庫(kù)工具

中脫穎而出。SparkSQL是在Spark1.0中被引入的。

在SparkSQL之前,加州大學(xué)伯克利分校曾經(jīng)嘗試修改ApacheHive以使其運(yùn)行在Spark

上,當(dāng)時(shí)的項(xiàng)目叫作Shark。現(xiàn)在,由于SparkSQL與Spark引擎和API的結(jié)合更緊密,

Shark已經(jīng)被SparkSQL所取代。

1.2.3SparkStreaming

SparkStreaming是Spark提供的對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行流式計(jì)算的組件。比如生產(chǎn)環(huán)境中的網(wǎng)

頁(yè)服務(wù)器日志,或是網(wǎng)絡(luò)服務(wù)中用戶(hù)提交的狀態(tài)更新組成的消息隊(duì)列,都是數(shù)據(jù)流。

SparkStreaming提供了用來(lái)操作數(shù)據(jù)流的API,并且與SparkCore中的RDDAPI高度對(duì)

應(yīng)。這樣一來(lái),程序員編寫(xiě)應(yīng)用時(shí)的學(xué)習(xí)門(mén)檻就得以降低,不論是操作內(nèi)存或硬盤(pán)中的數(shù)

據(jù),還是操作實(shí)時(shí)數(shù)據(jù)流,程序員都更能應(yīng)對(duì)自如。從底層設(shè)計(jì)來(lái)看,SparkStreaming

支持與SparkCore同級(jí)別的容錯(cuò)性、吞吐量以及可伸縮性。

1.2.4MLlib

Spark中還包含一個(gè)提供常見(jiàn)的機(jī)器學(xué)習(xí)(ML)功能的程序庫(kù),叫作MLlib。MLlib提供

了很多種機(jī)器學(xué)習(xí)算法,包括分類(lèi)、回歸、聚類(lèi)、協(xié)同過(guò)濾等,還提供了模型評(píng)估、數(shù)據(jù)

導(dǎo)入等額外的支持功能。MLlib還提供了一些更底層的機(jī)器學(xué)習(xí)原語(yǔ),包括一個(gè)通用的梯

度下降優(yōu)化算法。所有這些方法都被設(shè)計(jì)為可以在集群上輕松伸縮的架構(gòu)。

1.2.5GraphX

GraphX是用來(lái)操作圖(比如社交網(wǎng)絡(luò)的朋友關(guān)系圖)的程序庫(kù),可以進(jìn)行并行的圖計(jì)算。

與SparkStreaming和SparkSQL類(lèi)似,GraphX也擴(kuò)展了Spark的RDDAPI,能用來(lái)創(chuàng)建

一個(gè)頂點(diǎn)和邊都包含任意屬性的有向圖。GraphX還支持針對(duì)圖的各種操作(比如進(jìn)行圖

分害!I的subgraph和操作所有頂點(diǎn)的mapVertices),以及一些常用圖算法(比如

PageRank和三角計(jì)數(shù))。

1.2.6集群管理器

就底層而言,Spark設(shè)計(jì)為可以高效地在一個(gè)計(jì)算節(jié)點(diǎn)到數(shù)千個(gè)計(jì)算節(jié)點(diǎn)之間伸縮計(jì)算。

為了實(shí)現(xiàn)這樣的要求,同時(shí)獲得最大靈活性,Spark支持在各種集群管理器(cluster

manager)上運(yùn)行,包括HadoopYARN、ApacheMesos,以及Spark自帶的一個(gè)簡(jiǎn)易調(diào)

度器,叫作獨(dú)立調(diào)度器。如果要在沒(méi)有預(yù)裝任何集群管理器的機(jī)器上安裝Spark,那么

Spark自帶的獨(dú)立調(diào)度器可以讓你輕松入門(mén);而如果已經(jīng)有了一個(gè)裝有HadoopYARN或

Mesos的集群,通過(guò)Spark對(duì)這些集群管理器的支持,你的應(yīng)用也同樣能運(yùn)行在這些集群

上。第7章會(huì)詳細(xì)探討這些不同的選項(xiàng)以及如何選擇合適的集群管理器。

1.3Spark的用戶(hù)和用途

Spark是一個(gè)用于集群計(jì)算的通用計(jì)算框架,因此被用于各種各樣的應(yīng)用程序。在前言中

我們提到了本書(shū)的兩大目標(biāo)讀者人群:數(shù)據(jù)科學(xué)家和工程師。仔細(xì)分析這兩個(gè)群體以及他

們使用Spark的方式,我們不難發(fā)現(xiàn)這兩個(gè)群體使用Spark的典型用例并不一致,不過(guò)我

們可以把這些用例大致分為兩類(lèi)一一數(shù)據(jù)科學(xué)應(yīng)用和數(shù)據(jù)處理應(yīng)用。

當(dāng)然,這種領(lǐng)域和使用模式的劃分是比較模糊的。很多人也兼有數(shù)據(jù)科學(xué)家和工程師的能

力,有的時(shí)候扮演數(shù)據(jù)科學(xué)家的角色進(jìn)行研究,然后搖身一變成為工程師,熟練地編寫(xiě)復(fù)

雜的數(shù)據(jù)處理程序。不管怎樣,分開(kāi)看這兩大群體和相應(yīng)的用例是很有意義的。

1.3.1數(shù)據(jù)科學(xué)任務(wù)

數(shù)據(jù)科學(xué)是過(guò)去兒年里出現(xiàn)的新學(xué)科,關(guān)注的是數(shù)據(jù)分析領(lǐng)域。盡管沒(méi)有標(biāo)準(zhǔn)的定義,但

我們認(rèn)為數(shù)據(jù)科學(xué)家(datascientist)就是主要負(fù)責(zé)分析數(shù)據(jù)并建模的人。數(shù)據(jù)科學(xué)家有

可能具備SQL、統(tǒng)計(jì)、預(yù)測(cè)建模(機(jī)器學(xué)習(xí))等方面的經(jīng)驗(yàn),以及一定的使用Python、

Matlab或R語(yǔ)言進(jìn)行編程的能力。將數(shù)據(jù)轉(zhuǎn)換為更方便分析和觀察的格式,通常被稱(chēng)為

數(shù)據(jù)轉(zhuǎn)換(datawrangling),數(shù)據(jù)科學(xué)家也對(duì)這一過(guò)程中的必要技術(shù)有所了解。

數(shù)據(jù)科學(xué)家使用他們的技能來(lái)分析數(shù)據(jù),以回答問(wèn)題或發(fā)現(xiàn)一些潛在規(guī)律。他們的工作流

經(jīng)常會(huì)用到即時(shí)分析,所以他們可以使用交互式shell替代復(fù)雜應(yīng)用的構(gòu)建,這樣可以在

最短時(shí)間內(nèi)得到查詢(xún)語(yǔ)句和一些簡(jiǎn)單代碼的運(yùn)行結(jié)果。Spark的速度以及簡(jiǎn)單的API都能

在這種場(chǎng)景里大放光彩,而Spark內(nèi)建的程序庫(kù)的支持也使得很多算法能夠即刻使用。

Spark通過(guò)一系列組件支持各種數(shù)據(jù)科學(xué)任務(wù)。Sparkshell通過(guò)提供Python和Scala的接

口,使我們方便地進(jìn)行交互式數(shù)據(jù)分析。SparkSQL也提供一個(gè)獨(dú)立的SQLshell,我們可

以在這個(gè)shell中使用SQL探索數(shù)據(jù),也可以通過(guò)標(biāo)準(zhǔn)的Spark程序或者Sparkshell來(lái)進(jìn)

行SQL查詢(xún)。機(jī)器學(xué)習(xí)和數(shù)據(jù)分析則通過(guò)MLlib程序庫(kù)提供支持。另外,Spark還能支持

調(diào)用R或者M(jìn)atlab寫(xiě)成的外部程序。數(shù)據(jù)科學(xué)家在使用R或Pandas等傳統(tǒng)數(shù)據(jù)分析工

具時(shí)所能處理的數(shù)據(jù)集受限于單機(jī),而有了Spark,就能處理更大數(shù)據(jù)規(guī)模的問(wèn)題。

在初始的探索階段之后,數(shù)據(jù)科學(xué)家的工作需要被應(yīng)用到實(shí)際中。具體問(wèn)題包括擴(kuò)展應(yīng)用

的功能、提高應(yīng)用的穩(wěn)定性,并針對(duì)生產(chǎn)環(huán)境進(jìn)行配置,使之成為業(yè)務(wù)應(yīng)用的一部分。例

如,在數(shù)據(jù)科學(xué)家完成初始的調(diào)研之后,我們可能最終會(huì)得到一個(gè)生產(chǎn)環(huán)境中的推薦系統(tǒng),

可以整合在網(wǎng)頁(yè)應(yīng)用中,為用戶(hù)提供產(chǎn)品推薦。一般來(lái)說(shuō),將數(shù)據(jù)科學(xué)家的工作轉(zhuǎn)化為實(shí)

際生產(chǎn)中的應(yīng)用的工作是由另外的工程師或者工程師團(tuán)隊(duì)完成的,而不是那些數(shù)據(jù)科學(xué)家。

1.3.2數(shù)據(jù)處理應(yīng)用

Spark的另一個(gè)主要用例是針對(duì)工程師的。在這里,我們把工程師定義為使用Spark開(kāi)發(fā)

生產(chǎn)環(huán)境中的數(shù)據(jù)處理應(yīng)用的軟件開(kāi)發(fā)者。這些開(kāi)發(fā)者一般有基本的軟件工程概念,比如

封裝、接口設(shè)計(jì)以及面向?qū)ο蟮木幊趟枷?,他們通常有?jì)算機(jī)專(zhuān)業(yè)的背景,并且能使用工

程技術(shù)來(lái)設(shè)計(jì)和搭建軟件系統(tǒng),以實(shí)現(xiàn)業(yè)務(wù)用例。

對(duì)工程師來(lái)說(shuō),Spark為開(kāi)發(fā)用于集群并行執(zhí)行的程序提供了一條捷徑。通過(guò)封裝,

Spark不需要開(kāi)發(fā)者關(guān)注如何在分布式系統(tǒng)上編程這樣的復(fù)雜問(wèn)題,也無(wú)需過(guò)多關(guān)注網(wǎng)絡(luò)

通信和程序容錯(cuò)性。Spark已經(jīng)為工程師提供了足夠的接口來(lái)快速實(shí)現(xiàn)常見(jiàn)的任務(wù),以及

對(duì)應(yīng)用進(jìn)行監(jiān)視、審查和性能調(diào)優(yōu)。其API模塊化的特性(基于傳遞分布式的對(duì)象集)使

得利用程序庫(kù)進(jìn)行開(kāi)發(fā)以及本地測(cè)試大大簡(jiǎn)化。

Spark用戶(hù)之所以選擇Spark來(lái)開(kāi)發(fā)他們的數(shù)據(jù)處理應(yīng)用,正是因?yàn)镾park提供了豐富的

功能,容易學(xué)習(xí)和使用,并且成熟穩(wěn)定。

1.4Spark簡(jiǎn)史

Spark是由一個(gè)強(qiáng)大而活躍的開(kāi)源社區(qū)開(kāi)發(fā)和維護(hù)的,社區(qū)中的開(kāi)發(fā)者們來(lái)自許許多多不

同的機(jī)構(gòu)。如果你或者你所在的機(jī)構(gòu)是第一次嘗試使用Spark,也許你會(huì)對(duì)Spark這個(gè)項(xiàng)

目的歷史感興趣。Spark是于2009年作為一個(gè)研究項(xiàng)目在加州大學(xué)伯克利分校RAD實(shí)驗(yàn)

室(AMPLab的前身)誕生。實(shí)驗(yàn)室中的一些研究人員曾經(jīng)用過(guò)HadoopMapReduce。他

們發(fā)現(xiàn)MapReduce在迭代計(jì)算和交互計(jì)算的任務(wù)上表現(xiàn)得效率低下。因此,Spark從一

開(kāi)始就是為交互式查詢(xún)和迭代算法設(shè)計(jì)的,同時(shí)還支持內(nèi)存式存儲(chǔ)和高效的容錯(cuò)機(jī)制。

2009年,關(guān)于Spark的研究論文在學(xué)術(shù)會(huì)議上發(fā)表,同年Spark項(xiàng)目正式誕生。其后不

久,相比于MapReduce,Spark在某些任務(wù)上已經(jīng)獲得了10?20倍的性能提升。

Spark最早的一部分用戶(hù)來(lái)自加州伯克利分校的其他研究小組,其中比較著名的有Mobile

Millenniumo作為機(jī)器學(xué)習(xí)領(lǐng)域的研究項(xiàng)目,他們利用Spark來(lái)監(jiān)控并預(yù)測(cè)舊金山灣區(qū)的

交通擁堵情況。僅僅過(guò)了短短的一段時(shí)間,許多外部機(jī)構(gòu)也開(kāi)始使用Spark。如今,有超

過(guò)50個(gè)機(jī)構(gòu)將自己添加到了使用Spark的機(jī)構(gòu)列表頁(yè)面

(https:〃./confluence./display/SPARK/Powered+By+Spark)。在Spark

社區(qū)如火如荼的社區(qū)活動(dòng)SparkMeetups(http:〃/spark-users/)和

Spark峰會(huì)(http:〃/)中,許多機(jī)構(gòu)也向大家積極分享他們特有的

Spark應(yīng)用場(chǎng)景。除了加州大學(xué)伯克利分校,對(duì)Spark作出貢獻(xiàn)的主要機(jī)構(gòu)還有

Databricks、雅虎以及英特爾。

2011年,AMPLab開(kāi)始基于Spark開(kāi)發(fā)更高層的組件,比如Shark(Spark上的Hive)1

和SparkStreamingo這些組件和其他一些組件一起被稱(chēng)為伯克利數(shù)據(jù)分析工具棧(BDAS,

/software/)。

iShark已經(jīng)被SparkSQL所取代。

Spark最早在2010年3月開(kāi)源,并且在2013年6月交給了Apache基金會(huì),現(xiàn)在已經(jīng)成

TApache開(kāi)源基金會(huì)的頂級(jí)項(xiàng)目。

1.5Spark的版本和發(fā)布

自其出現(xiàn)以來(lái),Spark就一直是一個(gè)非?;钴S的項(xiàng)目,Spark社區(qū)也一直保持著非常繁榮

的態(tài)勢(shì)。隨著版本號(hào)的不斷更迭,Spark的貢獻(xiàn)者也與日俱增。Spark1.0吸引了100多

個(gè)開(kāi)源程序員參與開(kāi)發(fā)。盡管項(xiàng)目活躍度在飛速地提升,Spark社區(qū)依然保持著常規(guī)的發(fā)

布新版本的節(jié)奏。2014年5月,Spark1.0正式發(fā)布,而本書(shū)則主要關(guān)注Spark1.1.0以及

后續(xù)的版本。不過(guò),大多數(shù)概念在老版本的Spark中依然適用,而大多數(shù)示例也能運(yùn)行在

老版本的Spark上。

1.6Spark的存儲(chǔ)層次

Spark不僅可以將任何Hadoop分布式文件系統(tǒng)(HDFS)上的文件讀取為分布式數(shù)據(jù)集,

也可以支持其他支持Hadoop接口的系統(tǒng),比如本地文件、亞馬遜S3、Cassandra、Hive^

HBase等。我們需要弄清楚M是,Hadoop并非Spark的必要條件,Spark支持任何實(shí)現(xiàn)

了Hadoop接口的存儲(chǔ)系統(tǒng)。Spark支持的Hadoop輸入格式包括文本文件、SequenceFile,

Avro、Parquet等。我們會(huì)在第5章討論讀取和存儲(chǔ)時(shí)詳細(xì)介紹如何與這些數(shù)據(jù)源進(jìn)行交

互。

第2章Spark下載與入門(mén)

在本章中,我們會(huì)下載Spark并在本地模式下單機(jī)運(yùn)行它。本章是寫(xiě)給Spark的所有初學(xué)

者的,對(duì)數(shù)據(jù)科學(xué)家和工程師來(lái)說(shuō)都值得一讀。

Spark可以通過(guò)Python、Java或Scala來(lái)使用1。要用好本書(shū)不需要高超的編程技巧,但

是確實(shí)需要對(duì)其中某種語(yǔ)言的語(yǔ)法有基本的了解。我們會(huì)盡可能在示例中給出全部三種語(yǔ)

言的代碼。

iSpark1.4.0起添加J'R語(yǔ)言支持。

Spark本身是用Scala寫(xiě)的,運(yùn)行在Java虛擬機(jī)(JVM)上。要在你的電腦或集群上運(yùn)行

Spark,你要做的準(zhǔn)備工作只是安裝Java6或者更新的版本。如果你希望使用Python接口,

你還需要一個(gè)Python解釋器(2.6以上版本)。Spark尚不支持Python32。

zSpark1.4.0起支持Python3。譯者注

2.1下載Spark

使用Spark的第一步是下載和解壓縮。我們先從下載預(yù)編譯版本的Spark開(kāi)始。訪問(wèn)

/downloads.html,選擇包類(lèi)型為"Pre-builtforHadoop2.4and

later"(為Hadoop2.4及更新版本預(yù)編譯的版本),然后選擇"DirectDownload"直接下載。

這樣我們就可以得到一個(gè)壓縮的TAR文件,文件名為spark-1.2.0-bin-hadoop2.4.tgz.

2Windows用戶(hù)如果把Spark安裝到帶有空格的路徑下,可能會(huì)遇到一些問(wèn)題。所

以我們需要把Spark安裝到不帶空格的路徑下,比如C:\spark這樣的目錄中。

你不需要安裝Hadoop,不過(guò)如果你已經(jīng)有了一個(gè)Hadoop集群或安裝好的HDFS,請(qǐng)下載

對(duì)應(yīng)版本的Sparko你可以在http:〃/downloads.html里選擇所需要的包

類(lèi)型,這會(huì)導(dǎo)致下載得到的文件名略有不同。也可以選擇從源代碼直接編譯。你可以從

GitHub上下載最新代碼,也可以在下載頁(yè)面上選擇包類(lèi)型為“SourceCode"(源代碼)進(jìn)

行下載。

大多數(shù)類(lèi)Unix系統(tǒng),包括OSX和Linux,都有一個(gè)叫tar的命令行工具,可以用

來(lái)解壓TAR文件。如果你的操作系統(tǒng)沒(méi)有安裝tar,可以嘗試搜索網(wǎng)絡(luò)獲取免費(fèi)的TAR

解壓縮工具。比如,如果你使用的是Windows,可以試一下7-Zip.

下載好了Spark之后,我們要進(jìn)行解壓縮,然后看一看默認(rèn)的Spark發(fā)行版中都有些什么。

打開(kāi)終端,將工作路徑轉(zhuǎn)到下載的Spark壓縮包所在的目錄,然后解開(kāi)壓縮包。這樣會(huì)創(chuàng)

建出一個(gè)和壓縮包同名但是沒(méi)了.tgz后綴的新文件夾。接下來(lái)我們就把工作路徑轉(zhuǎn)到這個(gè)

新目錄下看看里面都有些什么。上面這些步驟可以用如下命令完成:

cd?

tar-xfspark-1.2.0-bin-hadoop2.4.tgz

cdspark-1.2.0-bin-hadoop2.4

Is

在tar命令所在的那一行中,x標(biāo)記指定tar命令執(zhí)行解壓縮操作,f標(biāo)記則指定壓縮

包的文件名。1s命令列出了Spark目錄中的內(nèi)容。我們先來(lái)粗略地看一看Spark目錄中

的一些比較重要的文件及目錄的名字和作用。

?README.md

包含用來(lái)入門(mén)Spark的簡(jiǎn)單的使用說(shuō)明。

?bin

包含可以用來(lái)和Spark進(jìn)行各種方式的交互的一系列可執(zhí)行文件,比如本章稍后會(huì)講到的

Sparkshello

?core>streamingspython

?包含Spark項(xiàng)目主要組件的源代碼。

?examples

包含一些可以查看和運(yùn)行的Spark程序,對(duì)學(xué)習(xí)Spark的API非常有幫助。

不要被Spark項(xiàng)目數(shù)量龐大的文件和復(fù)雜的目錄結(jié)構(gòu)嚇倒,我們會(huì)在本書(shū)接下來(lái)的部分中

講解它們中的很大一部分。就目前來(lái)說(shuō),我們還是按部就班,先來(lái)試試Spark的Python

和Scala版本的shell。讓我們從運(yùn)行一些Spark自帶的示例代碼開(kāi)始,然后再編寫(xiě)、編譯

并運(yùn)行一個(gè)我們自己簡(jiǎn)易的Spark程序。

本章我們所做的一切,Spark都是在本地模式下運(yùn)行,也就是非分布式模式,這樣我們只

需要用到一臺(tái)機(jī)器。Spark可以運(yùn)行在許多種模式下,除了本地模式,還支持運(yùn)行在

Mesos或YARN上,也可以運(yùn)行在Spark發(fā)行版自帶的獨(dú)立調(diào)度器上。我們會(huì)在第7章詳

細(xì)講述各種部署模式。

2.2Spark中Python和Scala的shell

Spark帶有交互式的shell,可以作即時(shí)數(shù)據(jù)分析。如果你使用過(guò)類(lèi)似R、Python、Scala

而提供的shell,或操作系統(tǒng)的shell(例如Bash或者Windows中的命令提示符),你也

會(huì)對(duì)Sparkshell感到很熟悉。然而和其他shell工具不一樣的是,在其他shell工具中你

只能使用單機(jī)的硬盤(pán)和內(nèi)存來(lái)操作數(shù)據(jù),而Sparkshell可用來(lái)與分布式存儲(chǔ)在許多機(jī)器

的內(nèi)存或者硬盤(pán)上的數(shù)據(jù)進(jìn)行交互,并且處理過(guò)程的分發(fā)由Spark自動(dòng)控制完成。

由于Spark能夠在工作節(jié)點(diǎn)上把數(shù)據(jù)讀取到內(nèi)存中,所以許多分布式計(jì)算都可以在幾秒鐘

之內(nèi)完成,哪怕是那種在十幾個(gè)節(jié)點(diǎn)上處理TB級(jí)別的數(shù)據(jù)的計(jì)算。這就使得一般需要在

shell中完成的那些交互式的即時(shí)探索性分析變得非常適合Spark。Spark提供Python以

及Scala的增強(qiáng)版shell,支持與集群的連接。

&

本書(shū)中大多數(shù)示例代碼都包含Spark支持的所有語(yǔ)言版本,但是交互式shell部分

只提供了Python和Scala版本的示例。shell對(duì)于學(xué)習(xí)API是非常有幫助的,因此我們建

議讀者在Python和Scala版本的例子中選擇一種進(jìn)行嘗試,即便你是Java開(kāi)發(fā)者也是如

此,畢竟各種語(yǔ)言的API是相似的。

展示Sparkshell的強(qiáng)大之處最簡(jiǎn)單的方法就是使用某個(gè)語(yǔ)言的shell作一些簡(jiǎn)單的數(shù)據(jù)分

析。我們一起按照Spark官方文檔中的快速入門(mén)指南

(http:〃/docs/latest/quick-start.html)中的示例來(lái)做一遍。

第一步是打開(kāi)Sparkshell。要打開(kāi)Python版本的Sparkshell,也就是我們所說(shuō)的

PySparkShell,進(jìn)入你的Spark目錄然后輸入:

bin/pyspark

(在Windows中則運(yùn)行bin'pyspark。)如果要打開(kāi)Scala版本的shell,輸入:

bin/spark-shell

稍等數(shù)秒,shell提示符就會(huì)出現(xiàn)。Shell啟動(dòng)時(shí):你會(huì)看到許多日志信息輸出。有的時(shí)候,

由于提示符之后又輸出了日志,我們需要按一下回車(chē)鍵,來(lái)得到一個(gè)清楚的shell提示符。

圖2-1是PySparkshell啟動(dòng)時(shí)的樣子。

hol<Jen9hnt>p2:-/Oowntoads/spark'1.1.0-bln-hadoopl$./bln/pyspark

Python2.7.6(default.Mar222014,22:59:56)

(GCC4.8.21onUnux2

Type"htlp".?copyright",or*UcenM*forMoreinforMtlon.

SparkastMblyhasb??nbuiltwithHive,includingDatanucleusjarsonclasspath

UsingSpark'sdefaultlog4jprofile:org/8pacbe/$p?rk八。g4j?dperties

14/11/1914:33:49WARMUtils:Yourhostname,hnbp2rvMlvestoaloopbackacWrcss:127.0.1.1;usinqinstead(oninterfacedockcrO)

14/11/1914:33:49WARNSetSPARKLOCALIPifyouneedtobindtoanotheraddress

14/11/1914:33:49INFOS?curityM?nag?r:Changingviewactsto:hold?n,

14/11/1914:33:49IMFOSecurItyManager:Ch4mgingnodifyaclsto:holden.

14/11/1914:33:49INFOSecurityMtnagvr:S?<urltyMtnag?r:?uthtntic?tlondisabled:ulactsdisabled;userswithviewp?rmlssions:S?t(hold?n,|

;userswithModifyperaissions:Set(holtfefi.)

14/11/1914:33:49IWFOSlf4jLo40?r:SIf4jLoggerfUrttd

14/11/1914:33:49INFORenotin^:Startingremotln9

14/11/1914:33:49INFOReooting:R??otlngstarted;Uitcnlngonaddresses:(akM.tep://sp?rkDrivcr^l72.17.42.1:35821j

14/11/1914:)3:49INFORewoting:Resot1ngnowlistensonaddresses:(akka.tcp://spark0rlver9172.17.42.1:3S621)

14/11/1914:33:49INFOUtlU:Successfullystartedservice?parkDriv?r,onport35021.

14/11/1914:33:49IMFOSparkEnv:RegisteringMapOutputTracker

14/11/1914:33:49INFOSparkEnv:RegisteringBlockMan&gerMaster

14/11/1914:33:49INFODlskBlockManager:Createdlocaldirectoryat/tiap/$p?rk-local-28141119143349-5776

14/11/1914:33:49INFOUtils:Successfullystartedservice'ConnectionnantK)?rforblockmanager'onport5721B.

14/11/1914:33:49INFOConnectlonM?n?q?r:Boundsockettoport57218withid?ConnectionM?fl49?rid(.57218)

14/11/1914:33:49INFOMeaoryStore:MeaoryStorestartedwithcapacity26s.4MB

14/11/1914:33:49INFOBlockM4n?9*rMast?r:TryingtoremitterBlockM?nA9?r

14/11/1914:33:49INFOBlockMana^erMasterActor:Registeringblockmanager:57218with265.4HBRAM

14/11/1914:33:49INFOBlockftana^erRaser:RegisteredBlockMarumer

14/11/1914:33:49INFOHttpFlleServer:HTTPFileserverdirectoryIs/tBp/sp?rk-399cS3M-ei>e8-4043-9a7d-9345e97eS7M

14/11/1914:33:49IMFORttpServer:StartingHTTPServer

14/11/1914:33:49INFOUtils:SuccessfullystartedserviceHTTPfileserver'onport4988.

14/11/1914:33:49INFOUtils:SuccessfullyttarttOs?rvlc?Sp?rkUI'onport4046

14/11/1914:33:49INFOSparkUI:St?rtedSparkUIathttp://172.17.42.1:4648

14/11/1914:33:49INFOAkkiUtllt:ConnectingtoMeartbeatRecciver:akka.tep://sparkOriveryi72.17.42.1:35e21/us?r/Heartt>eatR*c?lv?r

Welcometo

____________fl_

\\/\//_J'/

/_/..JI/_A_\version1.1.0

lusinqPythonversion2.7.6(default.Har22261422:59:56)

ISparkContextavailableassc.

圖2-1:默認(rèn)日志選項(xiàng)下的PySparkshell

如果覺(jué)得shell中輸出的日志信息過(guò)多而使人分心,可以調(diào)整日志的級(jí)別來(lái)控制輸出的信

息量。你需要在conf目錄下創(chuàng)建一個(gè)名為】perties的文件來(lái)管理日志設(shè)置。

Spark開(kāi)發(fā)者們已經(jīng)在Spark中加入了一個(gè)日志設(shè)置文件的模版,叫作

perties.templateo要讓日志看起來(lái)不那么啰嗦,可以先把這個(gè)日志設(shè)置模版文件

復(fù)制一份到conf/perties來(lái)作為日志設(shè)置文件,接下來(lái)找到下面這一行:

log4j.rootCategory=INFOzconsole

然后通過(guò)下面的設(shè)定降低日志級(jí)別,只顯示警告及更嚴(yán)重的信息:

log4j.rootCategory=WARN,console

這時(shí)再打開(kāi)shell,你就會(huì)看到輸出大大減少(圖2-2)。

-/DownkMrfVH**>>■h?ldfn?>Mnbp2:-/Daw?laM>iA(Mrt1-fwdooNMbeiMn<>h>nbp2.-;<?p<H/!?JOa>OOOOSn

holden@hnbp2:-/Downloads/spark-1.1.6-bin-hadooplS./bln/pyspark

Python2.7.6(default,Mar222814.22:59:56)

[GCC4.8.2]onllnuxZ

Type"help",*copyright","credits-or'license"formoreInfonnatlon.

SparkassemblyhasbeenbuiltwithHive,includingDatanucleusjarsonclasspath

14/11/1914:38:63WARNUtils:Yourhostname.hmbp2resolvestoaloopbackaddress:;usingInstead(onInterfacedockerO)

14/11/1914:38:03WARNUtils:SetSPARKLOCALIPifyouneedtobindtoanotheraddress

Welcometo

/T7_______n_

\\/_\/_,/_/?/

//./\.//IJ\\version1.1.6

IJ

UsingPythonversion2.7.6(default.Mar22201422:59:56)

SparkContextavailableassc.

9

圖2-2:降低日志級(jí)別后的PySparkshell

使用IPython

IPython是一個(gè)受許多Python使用者喜愛(ài)的增強(qiáng)版Pythonshell,能夠提供自動(dòng)補(bǔ)全等好

用的功能。你可以在http:〃上找到安裝說(shuō)明。只要把環(huán)境變量IPYTHON的值

設(shè)為1,你就可以使用IPython了:

IPYTHON=1./bin/pyspark

要使用IPythonNotebook,也就是Web版的IPython,可以運(yùn)行:

IPYTHON_OPTS=Hnotebook"./bin/pyspark

在Windows上,像下面這樣設(shè)置環(huán)境變量并運(yùn)行命令行:

setIPYTHON=1

bin\pyspark

在Spark中,我們通過(guò)對(duì)分布式數(shù)據(jù)集的操作來(lái)表達(dá)我們的計(jì)算意圖,這些計(jì)算會(huì)自動(dòng)地

在集群上并行進(jìn)行。這樣的數(shù)據(jù)集被稱(chēng)為彈性分布式數(shù)據(jù)集(resilientdistributed

dataset),簡(jiǎn)稱(chēng)RDD。RDD是Spark對(duì)分布式數(shù)據(jù)和計(jì)算的基本抽象。

在我們更詳細(xì)地討論RDD之前,先來(lái)使用shell從本地文本文件創(chuàng)建一個(gè)RDD來(lái)作一些

簡(jiǎn)單的即時(shí)統(tǒng)計(jì)。例2-1是Python版的例子,例2-2是Scala版的。

例2-1:Python行數(shù)統(tǒng)計(jì)

?>lines=sc.textFile(**README.mdH)#創(chuàng)建一個(gè)名為工ines的RDD

?>lines.count()#統(tǒng)計(jì)RDD中的元素個(gè)數(shù)

127

?>lines,first()#這個(gè)RDD中的第一個(gè)元素,也就是README.md的第一行

u,#ApacheSpark,

例2-2:Scala行數(shù)統(tǒng)計(jì)

scala>vallines=sc.textFile(**README.md")//創(chuàng)建一個(gè)名為lines的RDD

lines:spark.RDD[String]=MappedRDD[...]

scala>lines.count()//統(tǒng)計(jì)RDD中的元素個(gè)數(shù)

resO:Long=127

scala>lines.first()//這個(gè)RDD中的第一個(gè)元素,也就是README.md的第一行

resl:String=#ApacheSpark

要退出任一shell,按Ctrl-Do

fl八一

你可能在日志的輸出中注意到了這樣一行信息:INFOSparkUI:Started

SparkUIathttp://[ipaddress]:4040o你可以由這個(gè)地址訪問(wèn)Spark用戶(hù)界面,

查看關(guān)于任務(wù)和集群的各種信息。我們會(huì)在第7章中詳細(xì)討論。

在例2-1和例2-2中,變量lines是一個(gè)RDD,是從你電腦上的一個(gè)本地的文本文件創(chuàng)

建出來(lái)的。我們可以在這個(gè)RDD上運(yùn)行各種并行操作,比如統(tǒng)計(jì)這個(gè)數(shù)據(jù)集中的元素個(gè)

數(shù)在這里就是文本的行數(shù)),或者是輸出第一個(gè)元素。我們會(huì)在后續(xù)章節(jié)中深入探討

RDDo在此之前,讓我們先花些時(shí)間來(lái)了解Spark的基本概念。

2.3Spark核心概念簡(jiǎn)介

現(xiàn)在你已經(jīng)用shell運(yùn)行了你的第一段Spark程序,是時(shí)候?qū)park編程作更細(xì)致的了解

了。

從上層來(lái)看,每個(gè)Spark應(yīng)用都由一個(gè)驅(qū)動(dòng)器程序(driverprogram)來(lái)發(fā)起集群上的各

種并行操作。驅(qū)動(dòng)器程序包含應(yīng)用的main函數(shù),并且定義了集群上的分布式數(shù)據(jù)集,還

對(duì)這些分布式數(shù)據(jù)集應(yīng)用了相關(guān)操作。在前面的例子里,實(shí)際的驅(qū)動(dòng)器程序就是Spark

shell本身,你只需要輸入想要運(yùn)行的操作就可以了。

驅(qū)動(dòng)器程序通過(guò)一個(gè)SparkContext對(duì)象來(lái)訪問(wèn)Sparko這個(gè)對(duì)象代表對(duì)計(jì)算集群的一

個(gè)連接。shell啟動(dòng)時(shí)已經(jīng)自動(dòng)創(chuàng)建了一個(gè)SparkContext對(duì)象,是一個(gè)叫作sc的變量。

我們可以通過(guò)例2-3中的方法嘗試輸出sc來(lái)查看它的類(lèi)型。

例2-3:查看變量sc

?>SC

<pyspark.context.SparkContextobjectat0xl025b8f90>

一旦有了SparkContext,你就可以用它來(lái)創(chuàng)建RDD。在例2-1和例2-2中,我們調(diào)用了

sc.textFileO來(lái)創(chuàng)建一個(gè)代表文件中各行文本的RDD。我們可以在這些行上進(jìn)行各

種操作,比如count()。

要執(zhí)行這些操作,驅(qū)動(dòng)器程序一般要管理多個(gè)執(zhí)行器(executor)節(jié)點(diǎn)。比如,如果我們

在集群上運(yùn)行count。操作,那么不同的節(jié)點(diǎn)會(huì)統(tǒng)計(jì)文件的不同部分的行數(shù)。由于我們

剛才是在本地模式下運(yùn)行Sparkshell,因此所有的工作會(huì)在單個(gè)節(jié)點(diǎn)上執(zhí)行,但你可以將

這個(gè)shell連接到集群上來(lái)進(jìn)行并行的數(shù)據(jù)分析。圖2-3展示了Spark如何在一個(gè)集群上

運(yùn)行。

圖2-3:Spark分布式執(zhí)行涉及的組件

最后,我們有很多用來(lái)傳遞函數(shù)的API,可以將對(duì)應(yīng)操作運(yùn)行在集群上。比如,可以擴(kuò)展

我們的README示例,篩選出文件中包含某個(gè)特定單詞的行。以"Python”這個(gè)單詞為例,

具體代碼如例2-4(Python版本)和例2-5(Scala版本)所示。

例2-4:Python版本篩選的例子

?>lines=sc.textFile("README.md*,)

?>pythonLines=lines.filter(lambdaline:"Python”inline)

?>pythonLines.first()

u*##InteractivePythonShell,

例2-5:Scala版本篩選的例子

scala>vallines=sc.textFile("README.mdH)//創(chuàng)建一個(gè)叫l(wèi)ines的RDD

lines:spark.RDD[String]=MappedRDD[...]

scala>valpythonLines=lines.filter(line=>line.contains("Python"))

pythonLines:spark.RDD[String]=FilteredRDD[...]

scala>pythonLines.first()

resO:String=##InteractivePythonShell

向Spark傳遞函數(shù)

如果你對(duì)例2-4和例2-5中的lambda或者=>語(yǔ)法不熟悉,可以把它們理解為Python和

Scala中定義內(nèi)聯(lián)函數(shù)的簡(jiǎn)寫(xiě)方法。當(dāng)你在這些語(yǔ)言中使用Spark時(shí),你也可以單獨(dú)定義

一個(gè)函數(shù),然后把函數(shù)名傳給Spark。比如,在Python中可以這樣做:

defhasPython(line):

return"Python"inline

pythonLines=lines.filter(hasPython)

在Java中向Spark傳遞函數(shù)也是可行的,但是在這種情況下,我們必須把函數(shù)定義為實(shí)

現(xiàn)了Function接口的類(lèi)。例如:

JavaRDD<String>pythonLines=lines.filter(

newFunction<String,Boolean>(){

Booleancall(Stringline){returnline.contains("Python");}

)

Java8提供了類(lèi)似Python和Scala的lambda簡(jiǎn)寫(xiě)語(yǔ)法。下面就是一個(gè)使用這種語(yǔ)法的代

碼的例子:

JavaRDD<String>pythonLines=lines.filter(line->line.contains("Python"));

我們會(huì)在3.4節(jié)更深入地討論如何向Spark傳遞函數(shù)。

盡管后面會(huì)更詳細(xì)地講述SparkAPI,我們還是不得不感嘆,其實(shí)SparkAPI最神奇的地

方就在于像filter這樣基于函數(shù)的操作也會(huì)在集群上并行執(zhí)行。也就是說(shuō),Spark會(huì)自

動(dòng)將函數(shù)(比如line.contains("Python"))發(fā)到各個(gè)執(zhí)行器節(jié)點(diǎn)上。這樣,你就

可以在單一的驅(qū)動(dòng)器程序中編程,并且讓代碼自動(dòng)運(yùn)行在多個(gè)節(jié)點(diǎn)上。第3章會(huì)詳細(xì)講述

RDDAPI?

2.4獨(dú)立應(yīng)用

我們的Spark概覽中的最后一部分就是如何在獨(dú)立程序中使用Sparko除了交互式運(yùn)行之

外,Spark也可以在Java、Scala或Python的獨(dú)立程序中被連接使用。這與在shell中使

用的主要區(qū)別在于你需要自行初始化SparkContext。接下來(lái),使用的A

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論