版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
BigDataAnalyticswithSpark項(xiàng)目四SparkSQL處理結(jié)構(gòu)化學(xué)生數(shù)據(jù)項(xiàng)目概述SparkSQL是Spark生態(tài)中用于處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)模塊,開發(fā)人員可以輕松的借助API、SQL語句完成數(shù)據(jù)分析工作。SparkSQL支持多種數(shù)據(jù)源,實(shí)際開發(fā)中經(jīng)常要讀寫MySQL數(shù)據(jù)庫及Hive數(shù)據(jù)倉庫,本項(xiàng)目從DataFrame的創(chuàng)建入手,介紹不同的DataFrame的創(chuàng)建方法及DataFrame的各種操作。針對某校的學(xué)生信息文件(含學(xué)院、姓名、性別、年齡等),使用SparkSQL進(jìn)行分析,獲取分析結(jié)果;項(xiàng)目效果使用SparkSQL可以完成學(xué)生信息的分析,提取有價(jià)值的信息。例如分學(xué)院、性別統(tǒng)計(jì)年齡最大、最小值,如下所示:除此之外,還可以實(shí)現(xiàn)與SparkSQL與MySQL、Hive的連接。利用SparkSQL技術(shù)完成數(shù)據(jù)分析后,結(jié)果可再次寫入到MySQL、Hive中,如下所示:目錄任務(wù)1初識(shí)結(jié)構(gòu)化數(shù)據(jù)處理工具SparkSQL認(rèn)識(shí)DataFrame、DataSet數(shù)據(jù)類型由學(xué)生信息創(chuàng)建DataFrameSparkSQL分析學(xué)生信息(1)任務(wù)2任務(wù)3任務(wù)4SparkSQL分析學(xué)生信息(2)任務(wù)5任務(wù)6SparkSQL分析學(xué)生信息(3)目錄任務(wù)7SQL語法風(fēng)格處理學(xué)生信息通過JDBC連接MySQL數(shù)據(jù)庫任務(wù)9RDD、DataFrame與Dataset的相互轉(zhuǎn)換其他類型數(shù)據(jù)創(chuàng)建DataFrame任務(wù)11SparkSQL讀寫Hive數(shù)據(jù)任務(wù)8任務(wù)10思維導(dǎo)圖初識(shí)結(jié)構(gòu)化數(shù)據(jù)處理工具SparkSQL任務(wù)1SparkSQL是Spark體系中處理結(jié)構(gòu)化數(shù)據(jù)的有力工具。初步認(rèn)識(shí)SparkSQL,了解其演化歷程、特點(diǎn),并初步體驗(yàn)其使用過程。早期Hadoop生態(tài)體系中,數(shù)據(jù)處理主要使用MapReduce組件。缺陷:MapReduce學(xué)習(xí)成本較高、需要較多的Java編程等知識(shí)。后續(xù)產(chǎn)生了Hive分布式數(shù)據(jù)倉庫,它允許用戶使用類似于SQL的語法(HQL)處理結(jié)構(gòu)化數(shù)據(jù),極大降低了使用門檻;Hive與Hadoop高度集成,將HQL語言自動(dòng)換成MapReduce操作,可使用Yarn完成資源調(diào)度,最終完成結(jié)構(gòu)化數(shù)據(jù)的處理任務(wù);因其便捷性,Hive逐漸流行起來,成為搭建分布式數(shù)據(jù)倉庫的主流方案之一。缺陷:其底層基于MapReduce(HQL最終轉(zhuǎn)換為MapReduce操作),而MapReduce的shuffle需要大量的磁盤I/O,因此導(dǎo)致Hive的性能低下,復(fù)雜的操作可能運(yùn)行數(shù)個(gè)小時(shí),甚至數(shù)十個(gè)小時(shí)。為此,伯克利實(shí)驗(yàn)室開發(fā)了基于Hive的結(jié)構(gòu)化數(shù)據(jù)處理組件Shark(SparkSQL的前身)。Shark是Spark上的數(shù)據(jù)倉庫,最初設(shè)計(jì)成與Hive兼容;Shark在HiveQL方面重用了Hive中的HiveQL解析、邏輯執(zhí)行計(jì)劃翻譯、執(zhí)行計(jì)劃優(yōu)化等邏輯,但在執(zhí)行層面將MapReduce作業(yè)替換成了Spark作業(yè)(把HiveQL翻譯成Spark上的RDD操作)。因此與Hive相比,因其使用Spark的基于內(nèi)存的計(jì)算模型,性能得到了極大提升。缺陷:一是執(zhí)行計(jì)劃優(yōu)化完全依賴于Hive,對于其性能進(jìn)一步提升造成了約束;二是Spark是線程級(jí)并行,而MapReduce是進(jìn)程級(jí)并行,Spark在兼容Hive的實(shí)現(xiàn)上存在線程安全問題。此外,Shark繼承了大量的Hive代碼,因此后續(xù)優(yōu)化、維護(hù)較為麻煩,特別是基于MapReduce設(shè)計(jì)的部分,成為整個(gè)項(xiàng)目的瓶頸。因此,2014年Shark項(xiàng)目中止,并轉(zhuǎn)向SparkSQL的開發(fā)。1.1SparkSQL的產(chǎn)生早期,SparkSQL引入了SchemaRDD(即帶有Schema模式信息的RDD),使用戶可以在SparkSQL中執(zhí)行SQL語句,數(shù)據(jù)既可以來自RDD,也可來自Hive、HDFS、Cassandra等外部數(shù)據(jù)源,還可以是JSON、Parquest、CSV等格式的數(shù)據(jù)。開發(fā)語言方面,SparkSQL支持Scala、Java、Python等語言,支持SQL-92規(guī)范。從Spark1.2升級(jí)到Spark1.3以后,SparkSQL中的SchemaRDD改為DataFrame,DataFrame相對于SchemaRDD有了較大改變,同時(shí)提供了更多便捷的API。SparkSQL可以使用JDBC、ODBC等標(biāo)準(zhǔn)數(shù)據(jù)庫連接器,友好的支持各種SQL查詢;這樣其他第三方工具,如部分商業(yè)智能工具(PowerBI、Tableau等)可以接入Spark,借助Spark的強(qiáng)大計(jì)算能力完成大規(guī)模數(shù)據(jù)的處理。1.2
SparkSQL的特點(diǎn)在早期的Spark1.X版本中,Spark結(jié)構(gòu)化數(shù)據(jù)處理的入口為SQLContext和HiveContext;其中,SQLContext僅支持SQL語法解析器,而HiveContext繼承了SQLContext,HiveContext即支持SQL語法解析器又支持HiveQL語法解析器。Spark2.0版本之前,使用Spark必須先創(chuàng)建SparkContext,SparkContext是程序入口和程序執(zhí)行的“調(diào)度者”;下面代碼演示Spark1.X版本下,依次創(chuàng)建SparkConf、SparkContext、SQLContext實(shí)例://創(chuàng)建SparkConf實(shí)例valsparkConf=newSparkConf().setAppName("SparkSessionZipsExample").setMaster("local")//創(chuàng)建SparkContext實(shí)例valsc=newSparkContext(sparkConf)//創(chuàng)建SQLContext實(shí)例valsqlContext=neworg.apache.spark.sql.SQLContext(sc)1.3體驗(yàn)SparkSQL不同版本的操作差異在Spark2.0以后版本中,只要?jiǎng)?chuàng)建一個(gè)SparkSession實(shí)例就夠了,SparkConf、SparkContext和SQLContext都已經(jīng)被封裝在SparkSession當(dāng)中(SparkSession是程序入口、SparkSQL的上下文環(huán)境);HiveContext類已經(jīng)被移除,其功能遷移到SparkSession類中。下面代碼演示Spark2.X版下,通過SparkSession實(shí)例訪問SparkContext實(shí)例和SQLContext實(shí)例://創(chuàng)建SparkSession實(shí)例valspark=SparkSession.builder().appName("SparkSQLExample").getOrCreate()//由SparkSession創(chuàng)建SparkContext、SQLContext實(shí)例valsc=spark.sparkContextvalsqlContext=spark.sqlcontext注意,在SparkShell環(huán)境下,已經(jīng)建好了一個(gè)SparkSession對象spark,可以直接使用;但在獨(dú)立應(yīng)用程序中,則需要手工建立。1.3體驗(yàn)SparkSQL不同版本的操作差異認(rèn)識(shí)DataFrame、DataSet數(shù)據(jù)類型任務(wù)2Spark可將RDD封裝為DataFrame、DataSet,支持SQL類的操作。介紹DataFrame、Dataset數(shù)據(jù)類型,認(rèn)識(shí)RDD、DataFrame、Dataset三者的區(qū)別。SparkSQL中DataFrame是其核心數(shù)據(jù)抽象,其前身是SchemaRDD(Spark1.3中首次引入DataFrame的概念)。DataFrame借助Schema(模式信息)將數(shù)據(jù)組織到一個(gè)二維表格中(類似于關(guān)系型數(shù)據(jù)庫的表),每一列數(shù)據(jù)都存在列名。基于RDD進(jìn)行數(shù)據(jù)分析時(shí),因?yàn)镽DD的不可修改性,為了得到最終結(jié)果,需要進(jìn)行若干次轉(zhuǎn)換、生成若干RDD;用DataFrame進(jìn)行分析時(shí),一條SQL語句也可能包含多次轉(zhuǎn)換,但轉(zhuǎn)換操作在其內(nèi)部發(fā)生,并不會(huì)頻繁產(chǎn)生新的RDD,從而獲得更高的計(jì)算性能。2.1認(rèn)識(shí)DataFrameDataFrame與RDD的主要區(qū)別在于,前者帶有schema信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和數(shù)據(jù)類型。這使得SparkSQL得以洞察更多的結(jié)構(gòu)信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行效率的目的。反觀RDD,由于無法得知其元素的具體內(nèi)部結(jié)構(gòu),SparkCore只能在stage層面進(jìn)行簡單、通用的流水線優(yōu)化。如圖4-6所示,假設(shè)有若干人員Person數(shù)據(jù),包含name、age兩項(xiàng)信息。若用RDD進(jìn)行處理,則需要定義個(gè)Person類,將用戶數(shù)據(jù)封裝到Person類型對象中,RDD的每一個(gè)元素都是Person類型,而Spark并不清楚其內(nèi)部結(jié)構(gòu)。如果要把數(shù)據(jù)存放到DataFrame中,則每一個(gè)元素都會(huì)被封裝為Row類型,DataFrame提供了詳細(xì)的結(jié)構(gòu)信息,SparkSQL可以清楚地知道該數(shù)據(jù)集中包含多少列、每列的名稱和數(shù)據(jù)類型,如圖4-6所示。2.1認(rèn)識(shí)DataFrameDataset是DataframeAPI的一個(gè)擴(kuò)展,是Spark1.6版本加入的新的數(shù)據(jù)抽象,也是Spark2.0之后管理結(jié)構(gòu)化數(shù)據(jù)的主要數(shù)據(jù)抽象。Dataframe是Dataset的特列,DataFrame=Dataset[Row],Row是一個(gè)類型,跟Car、Person這些的用戶定義類型一樣,所有的表結(jié)構(gòu)信息都用Row來表示。DataSet是強(qiáng)類型的,可以有Dataset[Car]、Dataset[Person]等,而DataFrame的每一行數(shù)據(jù)則只能為Row類型。除此之外,DataFrame只是知道字段,但不知道字段的類型,所以在執(zhí)行這些操作的時(shí)候是沒辦法在編譯的時(shí)候檢查是否類型失敗的;比如對一個(gè)String進(jìn)行減法操作,編譯時(shí)不會(huì)報(bào)錯(cuò),在執(zhí)行的時(shí)候才報(bào)錯(cuò);而DataSet不僅僅知道字段,而且知道字段類型,所以有更嚴(yán)格的錯(cuò)誤檢查。2.2認(rèn)識(shí)Dataset如下所示,對于人員Person數(shù)據(jù),可以自行創(chuàng)建一個(gè)Person樣例類(caseclassPerson(name:String,age:Int)),而后將每個(gè)人的數(shù)據(jù)封裝為一個(gè)Person對象,最后放入Dataset中;即Dataset的每一行都是一個(gè)Person類實(shí)例。這種形式更加符合面向?qū)ο缶幊痰乃悸?,更加貼合業(yè)務(wù)場景,便于處理業(yè)務(wù)中的數(shù)據(jù)關(guān)系。2.2認(rèn)識(shí)DatasetRDD中,明確知道每個(gè)元素的具體類型,但不知道元素的具體屬性,需要加以判別。DataFrame中,每一個(gè)元素(每一行)均為Row類型,可以知道每個(gè)元素有多少列,每列的名稱是什么,但不知道每列的數(shù)據(jù)類型。DataSet集成了RDD和DataFrame的優(yōu)點(diǎn),可以明確知道每一個(gè)元素(每一行)的具體類型(預(yù)先定義的類),進(jìn)而知道每列數(shù)據(jù)的名稱,也知道其數(shù)據(jù)類型;在Spark2.X版,DataSet、DataFrame的API已做了統(tǒng)一。2.3RDD、DataFrame、Dataset三者的區(qū)別由學(xué)生信息創(chuàng)建DataFrame任務(wù)3要使用SparkSQL進(jìn)行數(shù)據(jù)分析,創(chuàng)建DataFrame;由JSON數(shù)據(jù)文件構(gòu)建DataFrame以供后續(xù)處理DataFrame的相關(guān)數(shù)據(jù)輸出、打印等方法。結(jié)構(gòu)化文件(JSON,CSV,Parquest等)、數(shù)據(jù)庫(mysql、Oracle等)、分布式數(shù)據(jù)庫等均可以生成DataFrame,從而進(jìn)行數(shù)據(jù)處理。下面演示由JSON文件創(chuàng)建DataFrame,其他生成方式,后續(xù)陸續(xù)講解。首先在Linux終端中,使用以下命令將people.json文件上傳到hadoop文件系統(tǒng):然后使用spark.read.json(“文件路徑”)或spark.read.format("json").load(“文件路徑”)形式創(chuàng)建DataFrame,如下所示(代碼中spark為SparkShell自動(dòng)生成的SparkSession對象):3.1由學(xué)生信息JSON文件創(chuàng)建DataFrame./hdfsdfs-put/home/hadoop/people.json/user/hadoopSparkDataFrame類派生于RDD類,因此與RDD類似,DataFrame的操作也分為轉(zhuǎn)換操作和行動(dòng)操作,同時(shí)DataFrame也具有惰性操作特點(diǎn)(只有提交行動(dòng)操作時(shí)才真正執(zhí)行計(jì)算)。SparkRDD經(jīng)常用take、collect等行動(dòng)操作查看數(shù)據(jù),DataFrame同樣提供了若干類似方法,常用的方法如下所示。3.2printSchema打印DataFrame的數(shù)據(jù)模式使用printSchema打印DataFrame的數(shù)據(jù)模式,可以看到df的列名稱、數(shù)據(jù)類型以及是否可以為空。3.2printSchema打印DataFrame的數(shù)據(jù)模式show相關(guān)方法有多個(gè),常用的如下所示3.3show方法顯示DataFrame、中的數(shù)據(jù)(1)show方法默認(rèn)顯示20行3.3show方法顯示DataFrame中的數(shù)據(jù)(2)show(numRows:Int)顯示前numRows行3.3show方法顯示DataFrame中的數(shù)據(jù)(3)show(5,false),顯示5行、顯示全部字符3.3show方法顯示DataFrame中的數(shù)據(jù)(1)first獲取DataFrame第一行數(shù)據(jù),返回值類型為org.apache.spark.sql.Row(2)head獲取DataFrame首元素,返回值類型為org.apache.spark.sql.Row3.4獲取DataFrame若干行記錄(3)take(numRows:Int),獲取numRows行數(shù)據(jù),返回值類型為Array[org.apache.spark.sql.Row]。(4)takeAsList(numRows:Int),獲取numRows行數(shù)據(jù),返回值類型為java.util.List[org.apache.spark.sql.Row]。3.4獲取DataFrame若干行記錄(1)collect獲取DataFrame的所有記錄,返回值類型為Array[org.apache.spark.sql.Row]3.5獲取DataFrame所有記錄(2)collectAsList獲取DataFrame的所有記錄,返回值類型為java.util.List[org.apache.spark.sql.Row]。3.5獲取DataFrame所有記錄SparkSQL
分析學(xué)生信息(1)
任務(wù)4where、filter、select等操作而進(jìn)一步分析學(xué)生信息。where方法主要用于篩選出符合條件的行,它有兩種參數(shù)形式。(1)參數(shù)為條件字符串(conditionExpr:String),方法定義如下所示,用于篩選出符合conditionExpr條件的數(shù)據(jù),其返回值類型為org.apache.spark.sql.Dataset。例如,要篩選出年齡大于21歲的學(xué)生信息,實(shí)現(xiàn)代碼如下所示。4.1where方法在條件語句conditionExpr中,可以使用and、or等連接詞;例如要找出人文學(xué)院或機(jī)械學(xué)院的男生信息,代碼如下所示。(2)參數(shù)為condition:Column條件,方法定義如圖4-23所示,其返回值類型為org.apache.spark.sql.Dataset。4.1where方法下圖為找出年齡大于20的學(xué)生信息;其中,$”age”表示age這一列。4.1where方法filter方法與where方法類似,下圖篩選出信息學(xué)院年齡大于20的學(xué)生信息。4.2
filter篩選相關(guān)數(shù)據(jù)4.3
select方法select方法用于選擇特定列生成新的Dataset,有多重參數(shù)形式,可以用String參數(shù),也可以用Column列參數(shù)。如下所示,其中,df(“institute”)、df(“age”)為org.apache.spark.sql.Column對象。selectExpr方法定義如下所示,可以對選定的列進(jìn)行特殊處理(例如改列名、取絕對值、四舍五入等),最終返回一個(gè)新的DataFrame?!癷nstituteasschool”使用as將列“institute”重命名為“school”;“round(age+0.52,2)”將age列值加0.52后,四舍五入保留2位小數(shù)。4.4
selectExpr方法對于selectExpr,也可以采用以下寫法:4.4
selectExpr方法可以用col()、apply()方法均可獲取指定的列,返回值類型為org.apache.spark.sql.Column;也可以使用df("name")形式得到Column。4.5取指定的Columndrop方法可去掉指定列,返回新的DataFrame。4.6去掉指定的列SparkSQL
分析學(xué)生信息(2)任務(wù)5orderBy排序、groupBy分組、distinct去重、join連接操作等進(jìn)一步分析學(xué)生數(shù)據(jù)。limit獲取DataFrame的前N行;與take方法不同,limit方法返回值為類型為Dataset。5.1limit獲取前N行orderBy、sort均可用于排序,二者等效、用法基本一致;下面以orderBy為例講解,orderBy方法有多重用法:(1)orderBy("age"),參數(shù)為String,按照age升序排列5.2orderBy、sort排序
(2)參數(shù)為Column,結(jié)合asc、desc進(jìn)行升序或降序排列。5.2orderBy、sort排序
(3)orderBy中可以設(shè)置多個(gè)參數(shù),進(jìn)行多字段組合排序。5.2orderBy、sort排序
groupBy為分組操作,其返回值類型為RelationalGroupedDataset,后續(xù)經(jīng)常與count、mean、max、min、sum等操作合用。如下所示,df按照“institute”分組,得到一個(gè)RelationalGroupedDataset實(shí)例。5.3
groupBy分組操作(1)使用count,按學(xué)院統(tǒng)計(jì)學(xué)生人數(shù),如下所示。5.3
groupBy分組操作(2)使用mean,按學(xué)院統(tǒng)計(jì)平均年齡,如下所示。(3)按學(xué)院、性別統(tǒng)計(jì)平均年齡,如下所示。(4)按學(xué)院、性別統(tǒng)計(jì)學(xué)生年齡最大值,如下所示。5.3
groupBy分組操作(1)distinct方法用于刪除DataFrame中的重復(fù)行。下圖中,創(chuàng)建含有重復(fù)元素的df2(createDataFrame方法將在后續(xù)任務(wù)中講解);然后,使用distinct方法去重,如下所示。5.4distinct去重操作(2)dropDuplicates可以根據(jù)指定的字段進(jìn)行去重。例如根據(jù)institute、sex兩列進(jìn)行去重,最終每個(gè)學(xué)院、男女生各保留一名學(xué)生。5.4distinct去重操作agg聚合操作常用于部分?jǐn)?shù)據(jù)列的統(tǒng)計(jì),也可以與groupBy組合使用,從而實(shí)現(xiàn)分組統(tǒng)計(jì)的功能;下面通過幾個(gè)例子演示agg的使用方法。(1)統(tǒng)計(jì)所有學(xué)生年齡的最大、最小、平均值。(2)統(tǒng)計(jì)所有學(xué)生年齡的最大、最小、平均值(第二種寫法)5.5agg聚合操作(3)統(tǒng)計(jì)所有學(xué)生年齡的加和、手機(jī)號(hào)碼最大值(4)與groupBy組合使用,分學(xué)院統(tǒng)計(jì)年齡最大值、最小值5.5agg聚合操作join操作用于連接連個(gè)DataFrame組成一個(gè)新的DataFrame;下面創(chuàng)建兩個(gè)DataFrame:df1、df2,演示內(nèi)連接join操作的效果。join操作也可以寫成如下形式,達(dá)到的效果基本一致,但join后產(chǎn)生的DataFrame的列數(shù)量不同、顯示效果有所差別(df3有3列,而df4有4列)。5.6join連接操作SparkSQL
分析學(xué)生信息(3)任務(wù)6stat用于制定字段統(tǒng)計(jì)、intersection獲取兩個(gè)DataFrame的共有記錄、na處置DataFrame中的空值等。進(jìn)一步分析學(xué)生信息。直接執(zhí)行SQL語句的方法。
stat方法可以用于計(jì)算指定字段或字段間的統(tǒng)計(jì),stat方法返回DataFrameStatFunctions類型對象;DataFrameStatFunctions又可以調(diào)用接口freqItems(找出頻繁出現(xiàn)的元素)、corr(兩列的相關(guān)性)、cov(兩列的協(xié)方差)等。(1)freqItems找出“age”、“institute”列中頻繁出現(xiàn)的元素。(2)corr求“age”、“phone”兩列的相關(guān)性。6.1stat方法獲取指定字段統(tǒng)計(jì)信息(3)cov求兩列的協(xié)方差6.1stat方法獲取指定字段統(tǒng)計(jì)信息intersect方法用于獲取兩個(gè)DataFrame的共有記錄(交集);用createDataFrame方法創(chuàng)建兩個(gè)DataFramedf1、df2,而后使用intersect方法獲取df1、df2的的共有記錄。6.2intersect方法獲取兩個(gè)DataFrame共有記錄(1)withColumnRenamed重命名指定的列字段名稱;下圖中,使用withColumnRenamed方法將列名改為“school”。6.3操作字段名(2)wihtColumn方法可在當(dāng)前DataFrame中增加一列(該列可來源于自身,但不能為其他DataFrame)下圖中,df使用wihtColumn方法增加一列,它是在原先age列df(“age”)基礎(chǔ)上,+1得來。6.3操作字段名SparkSQL中,經(jīng)常用na方法處置空值,其返回值類型為DataFrameNaFunctions;而DataFrameNaFunctions有drop、fill等方法具體處理空值。為演示na方法處理空值,由JSON文件people_null_values.json創(chuàng)建DataFrame;注意第二行sex、第三行數(shù)據(jù)institute數(shù)據(jù)缺失。6.4處置空值(1)drop方法用于刪除含有空值的行,只要行數(shù)據(jù)中含有空值(Null),就刪除該行。下圖中,首先由people_null_values.json生成DataFrame(df2),而后使用na、drop方法刪除含有空值的行。6.4處置空值(2)對于給定的列,如果有數(shù)值為空,刪除空數(shù)值所在的行;下圖中,使用drop方法刪除sex或institute為空值的行。6.4處置空值(3)fill方法用指定的值來代替空值;下圖中,對于sex為空的值填充為“男”,對于“institute”為空的值填充為“藝術(shù)學(xué)院”6.4處置空值SQL語法風(fēng)格處理
學(xué)生信息任務(wù)7直接使用SQL語句完成數(shù)據(jù)分析任務(wù)。采用SQL語法風(fēng)格處理學(xué)生信息。SparkSQL操作包括DSL和SQL兩種語法風(fēng)格;其中DSL(DomainSpecificLanguage)領(lǐng)域?qū)S谜Z言,其目的是幫助開發(fā)者調(diào)用特定的方法實(shí)現(xiàn)與SQL語句相同的功能,前面關(guān)于DataFrame的操作均為DSL操作。DSL語法類似于RDD中的操作,允許開發(fā)者調(diào)用相關(guān)方法完成對DataFrame數(shù)據(jù)分析;DSL風(fēng)格更符合面向?qū)ο缶幊痰乃枷?,可以避免不熟悉SQL語法帶來的麻煩。除了DSL,SparkSession提供了直接執(zhí)行SQL語句的sql(sqlText:String)方法;該方法以SQL語句為參數(shù),返回一個(gè)DataFrame對象。熟悉SQL語法的開發(fā)者,可以直接使用SQL語句進(jìn)行數(shù)據(jù)分析,可進(jìn)一步降低學(xué)習(xí)門檻。7.1SparkSQL中的DSL風(fēng)格與SQL風(fēng)格7.2創(chuàng)建臨時(shí)視圖要想使用SQL語句,需要將DataFrame對象注冊為一個(gè)臨時(shí)視圖;臨時(shí)視圖分為會(huì)話臨時(shí)視圖(TempView)和全局臨時(shí)視圖(GlobalTempView)兩種,創(chuàng)建方法如下圖所示。其中,會(huì)話臨時(shí)視圖作用域僅限于當(dāng)前會(huì)話,每個(gè)會(huì)話內(nèi)的臨時(shí)視圖不能被其他會(huì)話所訪問;而全局臨時(shí)視圖與當(dāng)前Spark應(yīng)用程序綁定,當(dāng)前應(yīng)用程序內(nèi)的所有SparkSession實(shí)例中均可訪問,即全局臨時(shí)視圖可以實(shí)現(xiàn)不同會(huì)話直接的共享。createTempView、createOrReplaceTempView均可以創(chuàng)建會(huì)話臨時(shí)視圖,但二者亦有區(qū)別:前者創(chuàng)建視圖時(shí),如果該視圖已經(jīng)存在(已經(jīng)創(chuàng)建過),則會(huì)拋出異常;后者創(chuàng)建視圖時(shí),如果視圖已經(jīng)存在,則用自己新創(chuàng)建的視圖代替原臨時(shí)視圖。創(chuàng)建了臨時(shí)視圖后,可以調(diào)用SparkSession的sql(sqlText:String)方法,執(zhí)行各種SQL操作。如下圖所示,找出年齡大于20歲的學(xué)生信息,輸出其name、age、school(institute別名)。7.3按條件查找學(xué)生信息(1)分組統(tǒng)計(jì)各學(xué)院學(xué)生數(shù)量,可以在SQL語句中使用count、groupby。(2)分學(xué)院、性別統(tǒng)計(jì)學(xué)生年齡最大、最小值,在SQL語句中可以使用max、min、groupby。7.4分組統(tǒng)計(jì)學(xué)生信息雖然SparkSQL提供了很多內(nèi)置函數(shù),但在實(shí)際生產(chǎn)環(huán)境中,仍然有不少場景是內(nèi)置函數(shù)無法支持的(或?qū)崿F(xiàn)較為復(fù)雜),這時(shí)就需要開發(fā)人員自定義函數(shù)來滿足要求。例如對于學(xué)生信息中的年齡,需要輸出學(xué)生是否成年(年齡大于20為成年人adult,小于20為未成年minor)。7.5用戶自定義函數(shù)判斷學(xué)生是否成年其他類型數(shù)據(jù)創(chuàng)建DataFrame任務(wù)8小規(guī)模數(shù)據(jù)測試時(shí)可以將內(nèi)存數(shù)據(jù)創(chuàng)建DataFrame。JSON、CSV、Parquet等文件創(chuàng)建DataFrame。與RDD類似,用戶可以根據(jù)內(nèi)存中的數(shù)據(jù)生成DataFrame。Spark允許使用createDataFrame方法,將有序集合創(chuàng)建DataFrame;在此過程中,可以使用Spark的類型推斷機(jī)制來確定DataFrame每列的數(shù)據(jù)類型。8.1內(nèi)存數(shù)據(jù)創(chuàng)建DataFrame首先創(chuàng)建一個(gè)List,而后通過spark.createDataFrame(peopleList)生成DataFrame(peopleDF);在此過程中,Spark的類型推斷機(jī)制確定了各列的數(shù)據(jù)類型;但通過打印輸出可以發(fā)現(xiàn),生成的peopleDF列名稱默認(rèn)為_1、_2、_3;可以使用toDF方法為各列指定名稱。8.1內(nèi)存數(shù)據(jù)創(chuàng)建DataFrameJSON文件創(chuàng)建DataFrame可直接使用SparkSession的read方法完成,具體有兩種寫法。8.2JSON數(shù)據(jù)創(chuàng)建DataFrame對于CSV文件,可以采用類似處理JSON文件方式讀??;但CSV文件一般需要設(shè)置option選項(xiàng);現(xiàn)有CSV文件authorWithTile.csv,由該文件生成DataFrame,代碼如下所示;其中,option("sep",",")表示分隔符為“,”,option("inferSchema","true")表示Spark自動(dòng)推斷各列的數(shù)據(jù)類型,option("header","true")表示CSV文件第一行作為列的名稱。8.3CSV數(shù)據(jù)創(chuàng)建DataFrameParquet是一種流行的列式存儲(chǔ)格式,可以高效地存儲(chǔ)具有嵌套字段的記錄。Parquet是語言無關(guān)的,而且不與任何一種數(shù)據(jù)處理框架綁定在一起,適配多種語言和組件,無論是Hive、Impala、Pig等查詢引擎,還是MapReduce、Spark等計(jì)算框架,均可以跟Parquet密切配合完成數(shù)據(jù)處理任務(wù)。Spark加載數(shù)據(jù)和輸出數(shù)據(jù)支持的默認(rèn)格式為Parquet;Spark已經(jīng)為用戶提供了parquet樣例數(shù)據(jù),在Spark安裝目錄下有“/examples/src/main/resources/users.parquet”文件;注意parquet文件是不可以人工直接閱讀的,如用gedit打開或者cat查看文件內(nèi)容,都會(huì)顯示亂碼。只有被加載到程序中以后,Spark會(huì)對這種格式進(jìn)行解析,然后我們才能看到其中的數(shù)據(jù);由parquet文件數(shù)據(jù)生成DataFrame如下所示。8.4Parquet數(shù)據(jù)創(chuàng)建DataFrameDataFrame的數(shù)據(jù)根據(jù)需要可以輸出到Parquet、JSON、CSV等文件中(Spark推薦Parquet文件);下圖展示了將DataFrame保存到不同文件的方法。注意在保存為CSV文件時(shí),也可以附帶一定option選項(xiàng),如option("header",true)表示保存為CSV文件時(shí),第一行為列的標(biāo)題。8.5DataFrame數(shù)據(jù)保存到文件中RDD、DataFrame與Dataset的相互轉(zhuǎn)換任務(wù)9RDD轉(zhuǎn)換為DataFrame,使用DataFrame的豐富的API,或者執(zhí)行更加簡便的SQL查詢。RDD、DataFrame、Dataset的相互轉(zhuǎn)換。SSparkSQL允許將元素為樣例類對象的RDD轉(zhuǎn)換為DataFrame;樣例類的聲明中預(yù)先定義了表的結(jié)構(gòu)信息(列名、數(shù)據(jù)類型),代碼執(zhí)行過程中通過反射機(jī)制讀取case類的參數(shù)名為列名?,F(xiàn)有bigdata_result.txt文件,記錄了學(xué)生的信息及大數(shù)據(jù)考試成績,其結(jié)構(gòu)如下所示。9.1使用反射機(jī)制由RDD生成DataFrame將bigdata_result.txt置于/home/hadoop目錄下,可以通過如下代碼完成從RDD到DataFrame的轉(zhuǎn)換。上述代碼中,首先定義了樣例類caseclassResult,然后讀取bigdata_result.txt文件創(chuàng)建了RDD,并使用map操作將其元素轉(zhuǎn)換為Result對象;最后使用toDF方法,將RDD轉(zhuǎn)為DataFrame。9.1使用反射機(jī)制由RDD生成DataFrameSSparkSQL允許將元素為樣例類對象的RDD轉(zhuǎn)換為DataFrame;樣例類的聲明中預(yù)先定義了表的結(jié)構(gòu)信息(列名、數(shù)據(jù)類型),代碼執(zhí)行過程中通過反射機(jī)制讀取case類的參數(shù)名為列名。9.1使用反射機(jī)制由RDD生成DataFrame當(dāng)無法提前定義caseclass時(shí),可以采用編程指定Schema的方式將RDD轉(zhuǎn)換成DataFrame?,F(xiàn)有people.txt文件,記錄了people的姓名、年齡、性別信息,如下所示;由該文件生成DataFrame過程包括3個(gè)步驟。9.2以編程方式由RDD生成DataFrame(1)生成Shema(即“表頭”結(jié)構(gòu)信息),包含字段名稱、字段類型和是否為空信息等信息。SparkSQL提供了StructType(fields:Seq[StructField])類來代表模式信息Schema。其中fields是一個(gè)集合,該集合中每一個(gè)元素均為StructField對象;StructField(name,dataType,nullable)用來表示表的字段信息,其中name表示字段名稱,dataType表示字段類型,nullable表示是否可以為空,下圖為制作表頭Schema的過程。9.2以編程方式由RDD生成DataFrame(2)生成表中的記錄(行),要求表中每一行(即RDD的每一個(gè)元素)均為Row類型。9.2以編程方式由RDD生成DataFrame(3)有了schema表頭及表中記錄后,使用createDataFrame方法將其組合在一起,構(gòu)建一個(gè)DataFrame。9.2以編程方式由RDD生成DataFrameDataFrame轉(zhuǎn)換為RDD比較簡單,只需調(diào)用rdd方法即可。9.3DataFrame轉(zhuǎn)為RDDSpark中,如果Dataset[T]中的T為Row類型,則Dataset[T]等價(jià)于DataFrame;下面介紹T泛型不為Row類型時(shí),Dataset與DataFrame的轉(zhuǎn)換。(1)將DataFrame轉(zhuǎn)換為Dataset9.4DataFrame與Dataset之間的轉(zhuǎn)換(2)toDF方法將Dataset轉(zhuǎn)換為DataFrame9.4DataFrame與Dataset之間的轉(zhuǎn)換通過JDBC連接MySQL數(shù)據(jù)庫任務(wù)10通過JDBC,SparkSQL可以實(shí)現(xiàn)與MySQL等數(shù)據(jù)庫的互聯(lián)互通現(xiàn)SparkSQL讀寫MySQL數(shù)據(jù)庫表中數(shù)據(jù),經(jīng)過處理后的數(shù)據(jù)再寫入MySQL。SparkSQL連接MySQL數(shù)據(jù)庫,需要啟動(dòng)MySQL數(shù)據(jù)庫、創(chuàng)建數(shù)據(jù)庫及表等操作。(1)安裝MySQL數(shù)據(jù)庫并啟動(dòng)在Ubuntu環(huán)境下,使用命令“sudoapt-getinstallmysql-server”即可完成MySQL數(shù)據(jù)庫的安裝,安裝過程可能需要輸入root用戶密碼等;具體安裝過程可查找相關(guān)書籍或網(wǎng)絡(luò)文檔。使用命令servicemysqlstart啟動(dòng)MySQL數(shù)據(jù)庫,而后使用命令mysql-uroot-p輸入root用戶密碼后連接到MySQL。10.1準(zhǔn)備工作接下來創(chuàng)建數(shù)據(jù)庫sparkTest、數(shù)據(jù)庫表people,并向表中插入3行數(shù)據(jù),代碼如下10.1準(zhǔn)備工作createdatabasesparkTest;usesparkTest;createtablepeople(idchar(10),namechar(20),sexchar(10),ageint(4),addresschar(50));insertintopeoplevalues('101','Tom','male',20,'Zhuhai,Guangdong');insertintopeoplevalues('102','Merry','female',21,'Shenzhen,Guangdong');insertintopeoplevalues('103','Ken','male',19,'Shenzhen,Guangdong');select*frompeople;要想使用JDBC連接MySQL,需要下載JDBC驅(qū)動(dòng)包(本教程將JDBC為mysql-connector-java-8.0.13.jar),將其放置于目錄/usr/local/spark/jars/下;然后Linux終端輸入以下命令后進(jìn)入spark-shell環(huán)境:10.1準(zhǔn)備工作cd/usr/local/spark/bin./spark-shell\--jars/usr/local/spark/jars/mysql-connector-java-8.0.13.jar\--driver-class-path/urs/local/spark/jars/mysql-connector-java-8.0.13.jarspark.read.format("jdbc")用于實(shí)現(xiàn)對MySQL數(shù)據(jù)庫的讀取操作,執(zhí)行如下代碼,可以讀取sparkTest數(shù)據(jù)庫中的people表,并生成DataFrame。10.2
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 教育行業(yè)企業(yè)培訓(xùn)合同糾紛仲裁服務(wù)合同
- 智能產(chǎn)品代理銷售及售后服務(wù)協(xié)議
- 2024版全新讓與擔(dān)保協(xié)議范本下載
- 人工智能輔助區(qū)塊鏈技術(shù)應(yīng)用研究合同
- 卒中患者急救流程
- 培訓(xùn)總結(jié)分析報(bào)告
- 2024年阿里巴巴云計(jì)算中心建設(shè)與運(yùn)營合同
- 物流快遞行業(yè)快遞網(wǎng)絡(luò)優(yōu)化與信息追溯系統(tǒng)
- 鑄件打磨設(shè)備采購合同模板
- 綜合活動(dòng)3 制作土壤水量檢測儀(說課稿)2023-2024學(xué)年六年級(jí)下冊信息技術(shù)閩教版
- 2024消防安全警示教育(含近期事故案例)
- Starter Section 1 Meeting English 說課稿 -2024-2025學(xué)年北師大版(2024)初中英語七年級(jí)上冊
- 2024年法律職業(yè)資格考試(試卷一)客觀題試卷及解答參考
- 2024-2025學(xué)年北師大版七年級(jí)上冊數(shù)學(xué)期末專項(xiàng)復(fù)習(xí):期末壓軸題分類(原卷版)
- 2024年全國《汽車加氣站操作工》安全基礎(chǔ)知識(shí)考試題庫與答案
- 2024-2025學(xué)年北師大版小學(xué)六年級(jí)上學(xué)期期末英語試卷及解答參考
- 食堂項(xiàng)目經(jīng)理培訓(xùn)
- 2024年人教版八年級(jí)道德與法治下冊期末考試卷(附答案)
- 公司事故隱患內(nèi)部報(bào)告獎(jiǎng)勵(lì)機(jī)制
- (高清版)DB34∕T 1337-2020 棉田全程安全除草技術(shù)規(guī)程
- 部編版小學(xué)語文二年級(jí)上冊單元測試卷含答案(全冊)
評(píng)論
0/150
提交評(píng)論