大數(shù)據(jù)挖掘?qū)д撆c案例課件-第8章 大數(shù)據(jù)挖掘關(guān)鍵技術(shù)_第1頁(yè)
大數(shù)據(jù)挖掘?qū)д撆c案例課件-第8章 大數(shù)據(jù)挖掘關(guān)鍵技術(shù)_第2頁(yè)
大數(shù)據(jù)挖掘?qū)д撆c案例課件-第8章 大數(shù)據(jù)挖掘關(guān)鍵技術(shù)_第3頁(yè)
大數(shù)據(jù)挖掘?qū)д撆c案例課件-第8章 大數(shù)據(jù)挖掘關(guān)鍵技術(shù)_第4頁(yè)
大數(shù)據(jù)挖掘?qū)д撆c案例課件-第8章 大數(shù)據(jù)挖掘關(guān)鍵技術(shù)_第5頁(yè)
已閱讀5頁(yè),還剩83頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第1章緒論第2章數(shù)據(jù)分析與可視化技術(shù)第3章認(rèn)識(shí)數(shù)據(jù)第4章數(shù)據(jù)預(yù)處理第5章分類(lèi)概念與方法第6章關(guān)聯(lián)分析概念與方法第7章聚類(lèi)分析概念與方法第8章大數(shù)據(jù)挖掘關(guān)鍵技術(shù)第9章案例分析第8章大數(shù)據(jù)挖掘關(guān)鍵技術(shù)大數(shù)據(jù)挖掘?qū)д撆c案例學(xué)習(xí)目標(biāo)/Target掌握Spark的安裝、了解運(yùn)行原理、掌握RDD編程,了解SparkSQL、Streaming和ML掌握Hadoop分布計(jì)算架構(gòu)的安裝、部署,了解HDFS原理、理解MapReduce框架和計(jì)算模型的原理和優(yōu)缺點(diǎn)目錄/Contents01大規(guī)模并行處理02Spark內(nèi)存計(jì)算大規(guī)模并行處理8.18.1.1Hadoop安裝Hadoop安裝方式(1)單機(jī)模式

在一臺(tái)運(yùn)行Linux操作系統(tǒng)的物理機(jī),或者在Windows操作系統(tǒng)中架設(shè)虛擬化平臺(tái),虛擬出運(yùn)行Linux操作系統(tǒng)的虛擬機(jī),在虛擬機(jī)上安裝Hadoop系統(tǒng)。該模式常用于大數(shù)據(jù)應(yīng)用程序的前期開(kāi)發(fā)和測(cè)試。(2)單機(jī)偽分布模式

在一臺(tái)運(yùn)行Linux操作系統(tǒng)的物理機(jī)或虛擬機(jī)上,用不同的進(jìn)程模擬Hadoop系統(tǒng)中分分布式運(yùn)行中的NameNode、DataNode、JobTracker、TaskTracker等節(jié)點(diǎn),模擬Hadoop集群的運(yùn)行模式,該模式常用于大數(shù)據(jù)應(yīng)用程序的測(cè)試。(3)分布式集群模式

在集群環(huán)境中安裝運(yùn)行Hadoop系統(tǒng),集群中的每個(gè)計(jì)算機(jī)運(yùn)行Linux操作系統(tǒng),該模式常用于大數(shù)據(jù)應(yīng)用程序的實(shí)際運(yùn)行,完成大數(shù)據(jù)分析和計(jì)算任務(wù)。8.1.1Hadoop安裝Hadoop安裝環(huán)境

在Windows操作系統(tǒng)中,使用VirtualBox6.1.18虛擬化平臺(tái),在虛擬機(jī)中安裝Ubuntu20.04.3版本的Linux操作系統(tǒng),安裝構(gòu)建單機(jī)偽分布式模式Hadoop系統(tǒng)的基本步驟如下:(1)創(chuàng)建用戶(hù)

在Ubuntu操作系統(tǒng)以root用的身份,創(chuàng)建hadoop用戶(hù),緊接著創(chuàng)建一個(gè)專(zhuān)門(mén)的用戶(hù)組,命名為hadoop,并將hadoop用戶(hù)加入hadoop用戶(hù)組中,基本的命令如下:[root@ubuntu~]#sudouseradd-mhadoop–d/home/hadoop。其中hadoop是用戶(hù)名,-d指明hadoop用戶(hù)的home目錄為/home/hadoop,該目錄為hadoop用戶(hù)在Ubuntu系統(tǒng)中的根目錄。[root@ubuntu~]#passwdhadoop[密碼],設(shè)置hadoop用戶(hù)的密碼。[root@ubuntu~]#sudogroupaddhadoop。創(chuàng)建hadoop用戶(hù)組。[root@ubuntu~]#sudousermod-a-Ghadoophadoop。將hadoop用戶(hù)加入hadoop用戶(hù)組。

使用vim/etc/sudoers命令打開(kāi)文件,在文件末尾加入hadoopALL=(ALL:ALL)ALL語(yǔ)句,使hadoop用戶(hù)與root用戶(hù)具有系統(tǒng)管理權(quán)限。8.1.1Hadoop安裝Hadoop安裝環(huán)境(2)配置SSH

在偽分布模式和分布式集群模式中,為了實(shí)現(xiàn)Hadoop集群中,所有節(jié)點(diǎn)可以免密碼登錄,需要配置SSH。在root用戶(hù)中,使用如下命令安裝Openssh。[root@ubuntu~]#sudoapt-getinstallopenssh-server-y[root@ubuntu~]$ssh-keygen-trsa#

使用該命令后,系統(tǒng)會(huì)提示多次確定,完成后將在/home/hadoop/.ssh目錄中生成id_rsa認(rèn)證文件,將該文件復(fù)制成名為authorized_keys的文件,并執(zhí)行sshlocalhost命令測(cè)試。如果出現(xiàn)如上圖所示的提示,即不需要數(shù)據(jù)用戶(hù)密碼,則配置正確,如果仍需要輸入密碼或提示錯(cuò)誤,則刪除.ssh/文件夾重新進(jìn)行認(rèn)證配置。[hadoop@ubuntu~]$catid_rsa.pub>>authorized_keys[hadoop@ubuntu~]$sshlocalhost8.1.1Hadoop安裝配置Hadoop

切換至root用戶(hù),下載JDK,使用命令tar-zxvfjdk-8u161-linux-x64.tar將JDK解壓未/usr/local/目錄中,將JDBK文件夾重命名文件夾為jdk1.8.0。

在Hadoop網(wǎng)站中下載hadoop-3.3.1.tar.gz安裝包文件,將其解壓在/usr/local/目錄中,將解壓后的Haoop文件夾重命名文件夾為hadoop-3.3.1。使用chown-Rhadoop:hadoop/usr/local/hadoop-3.3.1命令,將hadoop-3.3.1文件夾的所屬用戶(hù)修改為hadoop。8.1.1Hadoop安裝配置HadoopHadoop系統(tǒng)中的配置文件,集中存放在hadoop-3.3.1文件夾的etc/hadoop目錄中,主要涉及以下幾個(gè)配置文件。(1)etc/hadoop/hadoop-env.sh,在該文件中配置JDK安裝路徑,方法為在該文件的最后加上以下語(yǔ)句:exportJAVA_HOME=/usr/local/jdk1.8.0(2)~/.bashrc,在hadoop用戶(hù)的根目錄,即root用戶(hù)的/home/hadoop目錄中,找到hadoop用戶(hù)所屬的系統(tǒng)環(huán)境變量配置文件~/.bashrc,將JDK和hadoop環(huán)境變量加入該配置文件,保存后使用source~/.bashrc命令,使其立即生效。加入內(nèi)容如下:PATH=$PATH:$HOME/binexportJAVA_HOME=/usr/local/jdk1.8.0exportHADOOP_HOME=/usr/local//hadoop-3.3.1exportPATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbinexportCLASSPATH=$JAVA_HOME/lib:.=8.1.1Hadoop安裝配置Hadoop(3)etc/hadoop/core-site.xml,該配置文件主要完成HDFS中管理節(jié)點(diǎn)(NameNode)的IP和端口配置,該文件的文件結(jié)構(gòu)為XML,在<configuration></configuration>節(jié)點(diǎn)內(nèi)部加入HDFS中NameNode的IP地址、端口,緩沖區(qū)的大小以及存放臨時(shí)文件的目錄等,core-site.xml的基本配置如下:<property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property><property><name>hadoop.tmp.dir</name><value>/home/hadoop/tmp</value></property>8.1.1Hadoop安裝配置Hadoop(4)etc/hadoop/hdfs-site.xml,該配置文件主要完成HDFS分布式文件系統(tǒng)的數(shù)據(jù)備份數(shù)量、文件服務(wù)器地址和端口、Namenode和Datanode的數(shù)據(jù)存放路徑、文件存儲(chǔ)塊的大小等配置,hdfs-site.xml的基本配置如下:<property><name>dfs.replication</name><value>2</value></property><property><name>dfs.permissions</name><value>false</value></property><property> <name>dfs.http.address</name> <value>:50070</value></property>8.1.1Hadoop安裝配置Hadoop(5)etc/hadoop/mapred-site.xml,該文件主要完成Hadoop的MapReduce框架設(shè)定、配置MapReduce任務(wù)運(yùn)行的內(nèi)存大小、最大可用CPU核數(shù)等,使用yarn作為偽分布式Hadoop集群框架時(shí)的配置如下:<property><name></name><value>yarn</value></property>8.1.1Hadoop安裝配置Hadoop(6)etc/hadoop/yarn-site.xml,該文件主要配置資源管理器ResourceManager的地址,和其它分布式節(jié)點(diǎn)NodeManager資源配置、日志級(jí)別、任務(wù)調(diào)度器等。如資源管理器使用yarn管理偽分布式集群的yarn-site.xml配置如下:<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.resourcemanager.hostname</name><value>localhost</value></property>8.1.1Hadoop安裝配置Hadoop

如果在運(yùn)行Hadoop系統(tǒng)的MapReduce框架時(shí),提示“找不到或無(wú)法加載主類(lèi)org.apache.hadoop.mapreduce.v2.app.MRAppMaster”錯(cuò)誤,則需要在命令行中執(zhí)行hadoopclasspath命令,將該命令的執(zhí)行結(jié)果作為值,加入yarn-site.xml配置文件yarn.application.classpath屬性中,配置示例如下。<property><name>yarn.application.classpath</name><value>[執(zhí)行hadoopclasspath命令的返回結(jié)果]</value></property>8.1.1Hadoop安裝格式化HDFS文件系統(tǒng)

完成以上Hadoop的配置后,執(zhí)行如下format命令,格式化HDFS分布式文件系統(tǒng),分布式文件系統(tǒng)的格式化是測(cè)試Hadoop配置的第一步,根據(jù)格式化過(guò)程中的提示,可檢查配置是否成功。[hadoop@ubuntu~]$hadoopnamenode–format

如果格式化成功,該命令將返回NameNode的信息,其中會(huì)有“…h(huán)asbeensuccessfullyformatted.”和“Exittingwithstatus0”的提示。如果提出錯(cuò)誤,則需要根據(jù)提示修改配置文件,并刪除配置中設(shè)定的hadoop.tmp.dir目錄中的數(shù)據(jù)后,重新執(zhí)行格式化。8.1.1Hadoop安裝啟動(dòng)Hadoop

啟動(dòng)Hadoop使用start-all.sh命令。啟動(dòng)后,使用jps命令可查看如圖8.2所示的5個(gè)Hadoop相關(guān)進(jìn)程是否運(yùn)行正常。如果進(jìn)程均存在,則Hadoop配置完成。停止運(yùn)行Hadoop可使用stop-all.sh圖8.2jps命令查看Hadoop進(jìn)程8.1.1Hadoop安裝運(yùn)行程序測(cè)試使用vim命令在Hadoop系統(tǒng)管理節(jié)點(diǎn)中創(chuàng)建兩個(gè)文件,文件內(nèi)容如下:file1:hellohadoophelloworldfile2:hellosparkhellostreaming將創(chuàng)建的兩個(gè)文件上傳到HDFS對(duì)應(yīng)的目錄中,HDFS的使用命令如下:[hadoop@ubuntu~]$hdfsdfs-mkdir/input,創(chuàng)建目錄存放輸入數(shù)據(jù),其中“/”不能省。[hadoop@ubuntu~]$hdfsdfs-putfile*/input,將文件上傳到HDFS文件系統(tǒng)中的/input目錄中。切換到hadoop安裝路徑的/share/hadoop/mapreduce目錄,找到Hadoop自帶的hadoop-mapreduce-examples-3.3.1.jar文件,其中包含wordcount簡(jiǎn)單的詞頻統(tǒng)計(jì)程序。執(zhí)行[hadoop@ubuntu~]$hadoopjarhadoop-mapreduce-examples-3.3.1.jarwordcount/input/output命令,該命令執(zhí)行wordcount程序,讀取/input目錄中的輸入文件,自動(dòng)在HDFS中創(chuàng)建輸出目錄/output,將運(yùn)行結(jié)果保存在/output目錄中,查看運(yùn)行結(jié)果的方法如圖8.3所示。圖8.3Hadoop測(cè)試實(shí)例運(yùn)行結(jié)果8.1.1Hadoop安裝查看集群狀態(tài)通過(guò)瀏覽器可查看Hadoop集群的運(yùn)行情況,HDFS管理端的Web地址為http://localhost:50070,其中l(wèi)ocalhost是NameNode節(jié)點(diǎn)的地址,如圖8.4(a)所示;Yarn的管理界面的Web地址為http://localhost:8088,可以看到節(jié)點(diǎn)數(shù)目、提交任務(wù)的執(zhí)行情況,輸出錯(cuò)誤提示等,如圖8.6(b)所示。

(a)HDFS圖形界面

(b)Yarn圖形界面圖8.4Hadoop集群圖形管理界面8.1.2HDFS分布式文件系統(tǒng)HadoopDistributedFileSystem是一個(gè)分布式文件系統(tǒng),簡(jiǎn)稱(chēng)HDFS。HDFS有高容錯(cuò)性,設(shè)計(jì)用來(lái)部署在低廉的硬件上,以流式數(shù)據(jù)訪(fǎng)問(wèn)模式來(lái)存儲(chǔ)超大文件,提供高吞吐量來(lái)訪(fǎng)問(wèn)應(yīng)用程序的數(shù)據(jù),適合使用于具有超大數(shù)據(jù)集的應(yīng)用程序中。

HDFS上的文件被劃分為塊,作為獨(dú)立的存儲(chǔ)單元,稱(chēng)為數(shù)據(jù)塊(block),典型的大小是64MB。按照塊劃分?jǐn)?shù)據(jù)存儲(chǔ)有諸多好處,一個(gè)文件的大小可以大于網(wǎng)絡(luò)中任意一個(gè)磁盤(pán)的容量,文件的所有塊不需要存儲(chǔ)在同一個(gè)磁盤(pán)上,利用集群上的任意一個(gè)磁盤(pán)進(jìn)行存儲(chǔ),簡(jiǎn)化了存儲(chǔ)管理,同時(shí)元數(shù)據(jù)不需要和塊一起存儲(chǔ),用一個(gè)獨(dú)立的功能模塊管理塊的元數(shù)據(jù)。數(shù)據(jù)塊更加適合數(shù)據(jù)備份,進(jìn)而提供數(shù)據(jù)容錯(cuò)能力和提高可用性。8.1.2HDFS分布式文件系統(tǒng)HDFS集群主要由一個(gè)NameNode和多個(gè)DataNode組成

NameNode提供元數(shù)據(jù)、命名空間、數(shù)據(jù)備份、數(shù)據(jù)塊管理的服務(wù),

DataNode存儲(chǔ)實(shí)際的數(shù)據(jù),客戶(hù)端訪(fǎng)問(wèn)NameNode以獲取文件的元數(shù)據(jù)和屬性,而文件內(nèi)容的輸入輸出操作直接和DataNode進(jìn)行交互。HDFS采用Master/Slave的架構(gòu)來(lái)存儲(chǔ)數(shù)據(jù)HDFS體系結(jié)構(gòu)主要由四個(gè)部分組成,分別為HDFSClient、NameNode、DataNode和SecondaryNameNode,NameNode是一個(gè)中心服務(wù)器,負(fù)責(zé)管理文件系統(tǒng)的命名空間及客戶(hù)端對(duì)文件的訪(fǎng)問(wèn),集群中的DataNode是運(yùn)行一個(gè)進(jìn)程與NameNode交互,且進(jìn)行數(shù)據(jù)塊讀寫(xiě)的節(jié)點(diǎn),負(fù)責(zé)管理它所在節(jié)點(diǎn)上的數(shù)據(jù)塊,SecondaryNameNode輔助NameNode完成數(shù)據(jù)備份和編輯日志文件等8.1.2HDFS分布式文件系統(tǒng)圖8.5HDFS體系結(jié)構(gòu)8.1.2HDFS分布式文件系統(tǒng)HDFS文件系統(tǒng)的相關(guān)操作使用文件執(zhí)行命令來(lái)實(shí)現(xiàn),命令的書(shū)寫(xiě)方式有hadoopfs、hadoopdfs或hdfsdfs,三種寫(xiě)法的作用相同,以常用的hdfsdfs為例,最常用的形式如下:(1)hdfsdfs-ls,顯示當(dāng)前目錄結(jié)構(gòu),-ls-R遞歸顯示目錄結(jié)構(gòu);(2)hdfsdfs-mkdir,創(chuàng)建目錄;(3)hdfsdfs-rm,刪除文件,-rm-R遞歸刪除目錄和文件;(4)hdfsdfs-put[localsrc][dst],從本地加載文件到HDFS;(5)hdfsdfs-get[dst][localsrc],從HDFS導(dǎo)出文件到本地;(6)hdfsdfs-copyFromLocal[localsrc][dst],從本地加載文件到HDFS,與put一致;(7)hdfsdfs-copyToLocal[dst][localsrc],從HDFS導(dǎo)出文件到本地,與get一致;(8)hdfsdfs-cat,查看文件內(nèi)容;(9)hdfsdfs-du,統(tǒng)計(jì)指定目錄下各文件的大小,單位是字節(jié)。-du-s匯總文件大小,-du-h指定顯示單位;(10)hdfsdfs-tail,顯示文件末尾;(11)hdfsdfs-cp[src][dst],從源目錄復(fù)制文件到目標(biāo)目錄;(12)hdfsdfs-mv[src][dst],從源目錄移動(dòng)文件到目標(biāo)目錄。圖8.6MapReduce工作流程MapReduce工作流程8.1.3MapReduce計(jì)算模型MapReduce基于HDFS文件系統(tǒng),進(jìn)行大規(guī)模分布式文件處理的核心技術(shù),是Hadoop生態(tài)系統(tǒng)中的核心技術(shù)。MapReduce工作原理8.1.3MapReduce計(jì)算模型圖8.7MapReduce的工作原理MapReduce工作原理8.1.3MapReduce計(jì)算模型從邏輯角度來(lái)看MapReduce的作業(yè)運(yùn)行過(guò)程,主要分為5個(gè)階段,分別是數(shù)據(jù)分片(inputsplit)、Map、Shuffle、Reduce和輸出五個(gè)階段。(1)輸入數(shù)據(jù)(inputsplit)

對(duì)數(shù)據(jù)進(jìn)行分片,即將輸入數(shù)據(jù)切分為大小相等的數(shù)據(jù)塊。輸入分片和HDFS的block關(guān)系密切,每片數(shù)據(jù)作為單個(gè)MapTask的輸入,分片完成后多個(gè)MapTask便可以同時(shí)工作。(2)Map

每個(gè)MapTask在讀入各自的數(shù)據(jù)后進(jìn)行計(jì)算處理,并將中間結(jié)果保存在HDFS中,數(shù)據(jù)(如文本文件中的行,或數(shù)據(jù)表中的行)將以鍵值對(duì)<key,value>的形式傳入map方法,本階段計(jì)算完成后,同樣以鍵值對(duì)形式保存中間結(jié)果。鍵值對(duì)中的key決定了中間結(jié)果發(fā)送到哪個(gè)ReduceTask進(jìn)行合并,且key和ReduceTask是多對(duì)一的關(guān)系,具有相同key的數(shù)據(jù)會(huì)被發(fā)送給同一個(gè)ReduceTask,單個(gè)ReduceTask可能會(huì)接收到多個(gè)key值的數(shù)據(jù)。MapReduce工作原理8.1.3MapReduce計(jì)算模型(3)Shuffle

在進(jìn)入Reduce階段之前,MapReduce框架會(huì)對(duì)數(shù)據(jù)按照key值排序,使具有相同key的數(shù)據(jù)彼此相鄰。如果指定了合并操作(Combiner),框架會(huì)調(diào)用Combiner將具有相同key的數(shù)據(jù)進(jìn)行合并,Combiner的輸入、輸出的參數(shù)必須與Reduce保持一致,通常也叫做洗牌(Shuffle)。分區(qū)的意義則在于盡量減少每次寫(xiě)入磁盤(pán)的數(shù)據(jù)量和復(fù)制到下一階段節(jié)點(diǎn)之間傳輸?shù)臄?shù)據(jù)量。(4)ReduceReduce階段,相同key的鍵值對(duì)被發(fā)送至同一個(gè)ReduceTask,同一個(gè)ReduceTask會(huì)接收來(lái)自多個(gè)MapTask的鍵值對(duì),接收數(shù)據(jù)后數(shù)據(jù)的格式為鍵值對(duì)<key,[valuelist]>,其中[valuelist]為具有相同鍵的值列表,每個(gè)ReduceTask對(duì)key相同的[valuelist]進(jìn)行Reduce變成一個(gè)值輸出。(5)輸出

輸出階段定義輸出文件格式,以格式化方式將分析和計(jì)算的結(jié)果輸出到HDFS中。常用的方法是將數(shù)據(jù)轉(zhuǎn)換為字符串格式輸出。MapReduce工作原理8.1.3MapReduce計(jì)算模型MapReduce實(shí)例#!/usr/bin/envpython

importsys

forlineinsys.stdin:

line=line.strip()

words=line.split()

forwordinwords:

print("%s\t%s"%(word,1))

用Python實(shí)現(xiàn)第一個(gè)MapReduce應(yīng)用程序單詞計(jì)數(shù)WordCount,該程序也被稱(chēng)為MapReduce計(jì)算模型的“HelloWorld”程序,分為mapper.py文件和reducer.py文件兩個(gè)子程序的設(shè)計(jì),將兩個(gè)文件存儲(chǔ)在本地磁盤(pán)的/home/hadoop/code目錄中。mapper.py通過(guò)sys.stdin讀取標(biāo)準(zhǔn)文件流,將文件中的行用strip()方法去除兩邊空格,split()方法將文本行中的字符串按照空格分割為單詞,并對(duì)每個(gè)單詞組裝成(word,1)形式的<key,value>,print()函數(shù)輸出到標(biāo)準(zhǔn)輸出流。MapReduce工作原理8.1.3MapReduce計(jì)算模型MapReduce實(shí)例#!/usr/bin/envpython

importsys

current_word=None

current_count=0

word=None

forlineinsys.stdin:

line=line.strip()

word,count=line.split("\t",1)

try:

count=int(count)

exceptValueError:

continue

ifcurrent_word==word:

current_count+=count

else:

ifcurrent_word:

print("%s\t%s"%(current_word,current_count))

current_count=count

current_word=word

ifword==current_word:

print("%s\t%s"%(current_word,current_count))reducer.py通過(guò)sys.stdin讀取標(biāo)準(zhǔn)文件流,從文件流中讀取行,用strip()方法去除兩邊空格,split("\t",1)方法將文本行中的字符串按照制表符分割為單詞,并將每個(gè)單詞組裝成(word,1)形式的word和count,為了統(tǒng)計(jì)計(jì)算,將count中的字符強(qiáng)制轉(zhuǎn)化為整數(shù)類(lèi)型,如果當(dāng)前的單詞等于已經(jīng)讀入的單詞,則累加count,否則將讀入的單詞作為鍵值key,生成新的鍵值對(duì),最后通過(guò)print()函數(shù)輸出到標(biāo)準(zhǔn)輸出流。MapReduce工作原理8.1.3MapReduce計(jì)算模型圖8.8Map和Reduce程序測(cè)試

保存mapper.py和reducer.py文件后,需要在vim命令編輯窗口,使用setff命令檢查兩個(gè)文件的格式。在Ubuntu系統(tǒng)中需要設(shè)置文件格式為unix,可以先采用如圖8.8所示的方法,在本地驗(yàn)證mapper.py和reducer.py代碼的正確性。MapReduce工作原理8.1.3MapReduce計(jì)算模型

在Hadoop平臺(tái)上執(zhí)行Python版本的MapReduce程序的方法有兩種:

一種與Java編寫(xiě)的程序運(yùn)行方法一樣,用Jython把Python程序打包為jar文件,該jar包可以像Hadoop執(zhí)行Java編寫(xiě)的程序包一樣執(zhí)行;

另一種方式是使用Hadoop中自帶的流接口HadoopAPI運(yùn)行,使用HadoopAPI執(zhí)行示例為,在Hadoop安裝目錄的share/hadoop/tools/lib目錄中包含了hadoop-streaming-3.3.1.jar流處理文件,即使用如下命令執(zhí)行代碼,設(shè)定HadoopAPI、map、reduce、數(shù)據(jù)和輸出路徑,運(yùn)行成功后結(jié)果存儲(chǔ)在HDFS的/output目錄中。hadoopjarshare/hadoop/tools/lib/hadoop-streaming-3.3.1.jar\-file/home/hadoop/code/mapper.py\-mapper/home/hadoop/code/mapper.py\-file/home/hadoop/code/reducer.py\-reducer/home/hadoop/code/reducer.py\-input/input-output/outputSpark內(nèi)存計(jì)算8.28.2.1Spark安裝

安裝Spark之前需要安裝Linux系統(tǒng)、Java環(huán)境和Hadoop環(huán)境。Ubuntu中自帶了Python3.8,在Spark網(wǎng)站中下載spark-3.1.2-bin-hadoop3.2.tgz安裝包,使用解壓命令tar-zxvfspark-3.1.2-bin-hadoop3.2.tgz解壓安裝包至路徑/usr/local目錄中,將文件夾重命名為spark-3.1.2,并使用chown-Rhadoop:hadoop/usr/local/spark-3.1.2命令修改安裝目錄的所屬用戶(hù)。由于所有例子均使用python語(yǔ)言實(shí)現(xiàn),故安裝Spark的python版本pyspark。Pyspark安裝8.2.1Spark安裝

在Spark安裝目錄下找到conf文件夾,使用命令cpspark-env.sh.templatespark-env.sh復(fù)制一份配置文件,編輯spark-env.sh配置文件,在文件最后面加如下一行內(nèi)容,Spark才能從HDFS中讀寫(xiě)數(shù)據(jù),如果不加這項(xiàng)配置,則Spark只能訪(fǎng)問(wèn)本地?cái)?shù)據(jù)。exportSPARK_DIST_CLASSPATH=$(/usr/local/Hadoop-3.3.1/bin/hadoopclasspath)

在Hadoop用戶(hù)的環(huán)境變量~/.bashrc文件中,配置以下Spark環(huán)境變量。exportSPARK_HOME=/usr/local/spark-3.1.2exportPATH=$SPARK_HOME/bin:$PATHexportPYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATHexportPYSPARK_PYTHON=python圖8.9pyspark安裝測(cè)試Pyspark應(yīng)用的運(yùn)行8.2.1Spark安裝pyspark編輯和運(yùn)行應(yīng)用程序的方式有兩種,分別是命令行和文件。命令行編寫(xiě)pyspark應(yīng)用的方式如圖8.10所示圖8.10pyspark命令行LineCount.py#!/usr/bin/envpython3frompysparkimportSparkContext,SparkConf

conf=SparkConf().setMaster("local").setAppName("WordCount")

sc=SparkContext(conf=conf)

lines=sc.textFile("file:///home/hadoop/file/file1")

num=lines.count()

print(num)Pyspark應(yīng)用的運(yùn)行8.2.1Spark安裝使用spark-submit提交pyspark應(yīng)用程序的運(yùn)行過(guò)程如圖8.11所示,其中WARN為警告信息??稍赟park安裝的conf目錄的perties文件中修改log4j.rootCategory=ERROR.console,可定義日志級(jí)別為只顯示錯(cuò)誤信息,在集群中提交pyspark應(yīng)用程序時(shí),命令中的選項(xiàng)--master,也用于設(shè)定集群管理器地址。圖8.11spark-submit提交pyspark應(yīng)用Spark運(yùn)行框架8.2.2Spark運(yùn)行原理SparkCore實(shí)現(xiàn)Spark最基礎(chǔ)和核心的功能

其中包括內(nèi)存計(jì)算、任務(wù)調(diào)度、部署模式、故障恢復(fù)、存儲(chǔ)管理。Spark運(yùn)行架構(gòu)集群資源管理器(ClusterManager)可以是spark自帶的管理器也可以是YARN或Mesos等運(yùn)行作業(yè)任務(wù)的工作節(jié)點(diǎn)(WorkerNode)每個(gè)應(yīng)用的任務(wù)控制節(jié)點(diǎn)(Driver)每個(gè)工作節(jié)點(diǎn)上負(fù)責(zé)具體任務(wù)的執(zhí)行進(jìn)程(Executor)Spark運(yùn)行框架8.2.2Spark運(yùn)行原理結(jié)合圖8.12的SparkCore運(yùn)行框架,Spark應(yīng)用的實(shí)際執(zhí)行過(guò)程可歸納為以下10項(xiàng)圖8.12Spark運(yùn)行框架(1)應(yīng)用Application(2)驅(qū)動(dòng)程序Driver(3)資源管理器ClusterManager(4)執(zhí)行器Executor(5)作業(yè)Job(6)階段Stage(7)彈性分布式數(shù)據(jù)集(ResilientDistributedDataset,RDD)(8)有向無(wú)環(huán)圖(DirectedAcyclicGraph,DAG)(9)DAGScheduler(10)TaskSchedulerSpark運(yùn)行流程8.2.2Spark運(yùn)行原理Spark運(yùn)行流程如圖8.13所示

(1)構(gòu)建Application的運(yùn)行環(huán)境。啟動(dòng)SparkContext,SparkContext向ClusterManager注冊(cè)并申請(qǐng)運(yùn)行Executor資源。(2)ClusterManager為Executor分配資源,并啟動(dòng)Executor進(jìn)程,Executor的運(yùn)行情況,將隨著“心跳”發(fā)送到ClusterManager上。(3)SparkContext構(gòu)建DAG圖,將DAG圖分解成多個(gè)階段,并把每個(gè)階段的任務(wù)集發(fā)送給任務(wù)調(diào)度器。Executor向SparkContext申請(qǐng)任務(wù),任務(wù)調(diào)度器將任務(wù)發(fā)送給Executor,同時(shí)SparkContext將應(yīng)用程序代碼發(fā)送給Executor。(4)任務(wù)在Executor上運(yùn)行。把執(zhí)行結(jié)果反饋給任務(wù)調(diào)度器,然后再反饋給DAGScheduler。運(yùn)行結(jié)束后寫(xiě)入數(shù)據(jù),SparkContext向ClusterManager注銷(xiāo)并釋放所有資源。圖8.13Spark運(yùn)行流程RDD設(shè)計(jì)原理8.2.2Spark運(yùn)行原理彈性分布式數(shù)據(jù)集RDD是分布式內(nèi)存中的一個(gè)抽象概念,提供了一種高度受限的共享內(nèi)存模型。本質(zhì)上,RDD是一個(gè)只讀的內(nèi)存分區(qū)記錄集,每個(gè)RDD可以分成多個(gè)分區(qū),每個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,并且一個(gè)RDD的不同分區(qū)可以被保存到集群中不同的節(jié)點(diǎn)上,從而可以在集群中的不同節(jié)點(diǎn)上進(jìn)行并行計(jì)算。RDD提供了一組豐富的操作以支持常見(jiàn)的數(shù)據(jù)運(yùn)算,分為“行動(dòng)”(Action)和“轉(zhuǎn)換”(Transformation)兩種類(lèi)型,“行動(dòng)”用于執(zhí)行計(jì)算并指定輸出的形式,“轉(zhuǎn)換”指定RDD之間的相互依賴(lài)關(guān)系。兩類(lèi)操作的主要區(qū)別是轉(zhuǎn)換操作接收RDD并返回RDD,而行動(dòng)操作(比如count、collect等)接收RDD但返回非RDD(即輸出一個(gè)值或結(jié)果)。RDD提供的轉(zhuǎn)換接口非常簡(jiǎn)單,都是類(lèi)似于map、filter、groupBy、join等粗粒度的數(shù)據(jù)轉(zhuǎn)換操作,而不是針對(duì)某個(gè)數(shù)據(jù)項(xiàng)的細(xì)粒度修改。8.2.3RDD編程RDD創(chuàng)建Spark創(chuàng)建RDD的方式有兩種:(1)使用textFile()方法,通過(guò)讀取文件創(chuàng)建;(2)使用parallelize()方法,從已存在的集合創(chuàng)建,集合如python中的列表、元組等。圖8.14讀取HDFS文件創(chuàng)建RDD圖8.15通過(guò)并行列表創(chuàng)建RDD8.2.3RDD編程RDD操作圖8.22RDD轉(zhuǎn)換過(guò)程示意圖RDD轉(zhuǎn)換操作會(huì)產(chǎn)生不同的RDD,以便于下一次轉(zhuǎn)換使用。RDD的轉(zhuǎn)換過(guò)程中采用了惰性機(jī)制,整個(gè)轉(zhuǎn)換過(guò)程只是記錄轉(zhuǎn)換的邏輯,并不會(huì)發(fā)生真正的計(jì)算(當(dāng)遇到行動(dòng)操作時(shí),才會(huì)觸發(fā)真正的計(jì)算)。RDD的轉(zhuǎn)換操作主要有5個(gè)API,其中每個(gè)API都是一個(gè)高階方法,需要傳入另一個(gè)函數(shù)(function)作為參數(shù),該函數(shù)function定義了具體的轉(zhuǎn)換規(guī)則,function常用lambda表達(dá)式進(jìn)行設(shè)計(jì)。8.2.3RDD編程RDD操作(1)filter(function)filter()方法篩選滿(mǎn)足函數(shù)function條件的元素,并返回一個(gè)新的RDD。仍使用圖8.15中的RDD,篩選出包含spark關(guān)鍵詞的行的示例如圖8.16所示。后續(xù)示例如沒(méi)有特殊說(shuō)明,代碼均在pyspark命令行中運(yùn)行。圖8.16filter()方法的使用8.2.3RDD編程RDD操作(2)map(function)map()方法將RDD通過(guò)function函數(shù)映射為一個(gè)新的RDD,且該方法將對(duì)RDD中的每個(gè)元素按照f(shuō)unction函數(shù)的計(jì)算規(guī)則進(jìn)行轉(zhuǎn)換。將每個(gè)數(shù)據(jù)乘以2的數(shù)值轉(zhuǎn)換示例如圖8.17所示,對(duì)文本分割的示例如圖8.18所示。圖8.17map()方法轉(zhuǎn)換數(shù)值圖8.18map()方法轉(zhuǎn)換文本8.2.3RDD編程RDD操作(3)flatMap(function)flatMap()方法和map()方法在RDD中轉(zhuǎn)換的作用基本相同,不同之處在于其增加了“拍扁”(flat)功能,“拍扁”后RDD中的每行中僅有一個(gè)元素。flatMap()方法的示例如圖8.19所示。圖8.19flatMap()方法的使用8.2.3RDD編程RDD操作(4)groupByKey()groupByKey()方法應(yīng)用于鍵值對(duì)(K,V)的數(shù)據(jù)集時(shí),返回一個(gè)新的(K,Iterable)形式的數(shù)據(jù)集。groupByKey()方法的示例如圖8.20所示。在示例中使用flatMap()方法將文本轉(zhuǎn)換為每行只有一個(gè)單詞,再把每個(gè)單詞轉(zhuǎn)換為鍵值對(duì)(word,1)的形式。groupByKey()方法會(huì)對(duì)具相同鍵的數(shù)據(jù)進(jìn)行合并,如示例中“Hello”出現(xiàn)了3次,所以進(jìn)行合并,且合并后的集合在一個(gè)Iterable對(duì)象中,形如(’hello’,(1,1,1)),使用map()方法可對(duì)Iterable中數(shù)據(jù)進(jìn)行聚合。圖8.20groupByKey()方法的使用8.2.3RDD編程RDD操作(5)reduceByKey(function)reduceByKey()方法作用于(K,V)鍵值對(duì)的RDD時(shí),返回一個(gè)新的(K,V)形式的RDD,鍵值對(duì)將按照鍵值K值傳遞到function函數(shù)中,并對(duì)V進(jìn)行聚合計(jì)算。reduceByKey()方法的示例如圖8.21所示。示例中使用flatMap()方法將文本轉(zhuǎn)換為每行只有一個(gè)單詞,再將每個(gè)單詞轉(zhuǎn)換為鍵值對(duì)(word,1),reduceByKey()方法中傳入的a與b是具有相同鍵的值。圖8.21reduceByKey()方法的使用8.2.3RDD編程RDD操作

從文件中加載數(shù)據(jù),完成一次又一次的轉(zhuǎn)換操作后,當(dāng)pyspark程序執(zhí)行到行動(dòng)操作時(shí),才會(huì)執(zhí)行真正的計(jì)算,完成行動(dòng)操作并得到結(jié)果。常用的RDD行動(dòng)操作API有以下幾種,它們的使用方法如下:(1)count():count()方法返回RDD中元素的個(gè)數(shù)圖8.23count()方法(2)collect():collect()方法以列表的形式返回RDD中元素圖8.24collect()方法8.2.3RDD編程RDD操作(3)first()和take(n):first()方法用于獲取RDD中的第一個(gè)元素,take(n)方法用于獲取RDD中的前n個(gè)元素得到新的列表,它們常用于數(shù)據(jù)的觀(guān)察(4)reduce(function):reduce()也用于聚合,reduce()與reduceByKey()的功能不同,reduce()是行動(dòng)操作,常用于聚合RDD中的元素,傳入的數(shù)據(jù)是數(shù)據(jù)集合中的所有元素,而reduceByKey()是轉(zhuǎn)換操作,傳入的數(shù)據(jù)是具有相同鍵的值圖8.25first()和take(n)方法圖8.26reduce()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換鍵值形式的RDD,是指RDD中每個(gè)元素都是(K,V)格式的數(shù)據(jù),是最常見(jiàn)的RDD數(shù)據(jù)類(lèi)型。鍵值對(duì)RDD仍然通過(guò)文件加載或集合創(chuàng)建。(1)keys()和values():keys()方法返回所有鍵值對(duì)中的K,values()方法返回所有鍵值對(duì)中的V,分別形成新的RDD圖8.27keys()和values()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換(2)sortByKey()和sortBy():sortByKey()方法返回根據(jù)鍵值排序的RDD,sortBy()方法可按照鍵值對(duì)中的指定值排序,兩個(gè)方法都有升降序排序開(kāi)關(guān),默認(rèn)True為升序,F(xiàn)alse為降序圖8.28sortByKey()和sortBy()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換(3)mapValues(function):mapValues()方法是針對(duì)鍵值對(duì)中V的轉(zhuǎn)換,它會(huì)把所有的V都按照f(shuō)unction定義的轉(zhuǎn)換邏輯進(jìn)行相同的轉(zhuǎn)換操作,不會(huì)影響鍵K圖8.29mapValues()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換(4)join():join()方法是對(duì)兩個(gè)RDD中相同鍵值的數(shù)據(jù)執(zhí)行內(nèi)連接,將(K,V1)和(K,V2)連接成(K,(V1,V2))的鍵值對(duì)形式圖8.30join()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換(5))combineByKey():combineByKey()方法是Spark中核心的高階方法,其他一些高階方法的底層都是用該方法實(shí)現(xiàn),如groupByKey(),reduceByKey()等。combineByKey()方法的原型為:combineByKey(createCombiner:V=>C,mergeValue:(C,V)=>C,mergeCombiners:(C,C)=>C,partitioner:Partitioner,mapSideCombine:Boolean=true,serializer:Serializer=null圖8.31combineByKey()方法8.2.3RDD編程鍵值對(duì)轉(zhuǎn)換(6)saveAsTextFile():saveAsTextFile()方法將RDD以文本文件的格式存儲(chǔ)到文件系統(tǒng)中,該方法將按照分區(qū)數(shù)量生成文件的個(gè)數(shù)圖8.32saveAsTextFile()方法保存結(jié)果圖8.33瀏覽saveAsTextFile()保存結(jié)果8.2.4SparkSQLDataFrame的創(chuàng)建和保存

執(zhí)行SparkSQL的方法也有命令行和獨(dú)立應(yīng)用兩種方式,其中啟動(dòng)pyspark時(shí)會(huì)自動(dòng)建立名稱(chēng)為spark的SparkSession實(shí)例,采用獨(dú)立應(yīng)用的方式時(shí),需要通過(guò)以下三行代碼自定義SparkSession實(shí)例。自定義SparkSession實(shí)例#!/usr/bin/envpython3

frompysparkimportSparkContext,SparkConffrompyspark.sqlimportSparkSessionspark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()8.2.4SparkSQLDataFrame的創(chuàng)建和保存圖8.37創(chuàng)建DataFrame示例8.2.4SparkSQLDataFrame的常用操作(1)show(numRows:Int,truncate:Boolean)方法(2)printSchema(),用于打印DataFrame的結(jié)構(gòu)(3)select()方法,和SQL語(yǔ)句中的用法一樣,可投影DataFrame中的列或在原有列上進(jìn)行運(yùn)算(4)filter()或where()方法,相當(dāng)于SQL中的where關(guān)鍵字(5)sort()或orderBy()方法(6)groupby()方法,類(lèi)似于SQL中的GroupBy關(guān)鍵字,統(tǒng)計(jì)每個(gè)分組中的數(shù)據(jù)(7)distinct()方法,用于去除重復(fù)(8)collect()方法(9)withColumn()方法(10)和SQL中對(duì)應(yīng),DataFrame也有并、交、差運(yùn)算,分別對(duì)應(yīng)于union()、intersect()、subtract()方法。(11)join()方法,用于連接,生成新的DataFrame8.2.4SparkSQLDataFrame的常用操作圖8.38printSchema()圖8.39select()、filter()和sort()方法8.2.4SparkSQLRDD轉(zhuǎn)換為DataFrame從RDD轉(zhuǎn)換為DataFrame有兩種方式:(1)利用反射機(jī)制推斷RDD模式(2)使用編程方式定義模式,構(gòu)造一個(gè)DataFrame結(jié)構(gòu),并編程定義的DataFrame結(jié)構(gòu)與已有的RDD合并形成DataFrame。圖8.40反射機(jī)制推斷RDD結(jié)構(gòu)創(chuàng)建DataFrame8.2.4SparkSQLRDD轉(zhuǎn)換為DataFrameDataFrame結(jié)構(gòu)設(shè)計(jì)需要字段名稱(chēng)、字段類(lèi)型和是否為空等信息,SparkSQL提供了StructType來(lái)表示結(jié)構(gòu)信息,在其中加入StructField(name,dataType,nullable=True,metedate=None)作為StructType()方法的參數(shù)來(lái)定義多個(gè)字段,DataFrame數(shù)據(jù)被封裝在Row中,最后使用spark.createDataFrame()將結(jié)構(gòu)和數(shù)據(jù)組裝形成DataFrame圖8.41編程方式定義RDD結(jié)構(gòu)創(chuàng)建DataFrame8.2.4SparkSQLSparkSQL讀寫(xiě)數(shù)據(jù)庫(kù)圖8.42MySQL中創(chuàng)建數(shù)據(jù)庫(kù)和表圖8.43SparkSQL讀取MySQL數(shù)據(jù)8.2.4SparkSQLSparkSQL讀寫(xiě)數(shù)據(jù)庫(kù)圖8.44SparkSQL向MySQL數(shù)據(jù)表中插入數(shù)據(jù)8.2.5Spark流式計(jì)算SparkStreaming

SparkStreaming是構(gòu)建在Spark上的實(shí)時(shí)計(jì)算框架,它擴(kuò)展了Spark處理大規(guī)模流式數(shù)據(jù)的能力。SparkStreaming接收實(shí)時(shí)流數(shù)據(jù)后,根據(jù)一定的時(shí)間間隔(通常是0.5~2秒)將數(shù)據(jù)流拆分成一批批的數(shù)據(jù),Spark處理批數(shù)據(jù),最終得到處理一批批數(shù)據(jù)的結(jié)果數(shù)據(jù)。SparkStreaming最主要的抽象數(shù)據(jù)類(lèi)型是DStream(離散化數(shù)據(jù)流,DistcretizedStream),因此對(duì)應(yīng)流數(shù)據(jù)的DStream可以看成一組RDDs,即RDD的一個(gè)序列,對(duì)DStream的操作最終轉(zhuǎn)變?yōu)橄鄳?yīng)RDD的操作。SparkStreaming可整合多種輸入數(shù)據(jù)源,如Kafka、Flume、HDFS或TCP套接字,經(jīng)處理后的數(shù)據(jù)可存儲(chǔ)至文件系統(tǒng)、數(shù)據(jù)庫(kù)、或打印輸出到控制臺(tái)。8.2.5Spark流式計(jì)算SparkStreaming編寫(xiě)SparkStreaming程序的基本步驟如下:1)創(chuàng)建輸入DStream來(lái)定義數(shù)據(jù)源;2)對(duì)DStream定義流計(jì)算規(guī)則;3)調(diào)用StreamingContext對(duì)象的Start()方法開(kāi)始接收數(shù)據(jù)和處理流程;4)調(diào)用StreamingContext對(duì)象的awaitTermination()方法,等待流計(jì)算結(jié)束。圖8.45命令行創(chuàng)建StreamingContext對(duì)象#!/usr/bin/envpython3

frompysparkimportSparkContext,SparkConffrompyspark.streamingimportStreamingContext

conf=SparkConf().setMaster("local").setAppName("DStreamApp")

sc=SparkContext(conf=conf)ssc=StreamingContext(sc,1)創(chuàng)建StreamingContext對(duì)象的方式8.2.5Spark流式計(jì)算文件流圖8.46SparkStreaming流式計(jì)算端圖8.47SparkStreaming流式計(jì)算端計(jì)算過(guò)程8.2.5Spark流式計(jì)算TCP套接字圖8.48Socket服務(wù)器圖8.49SparkStreaming處理Socket套接字流8.2.5Spark流式計(jì)算TCP套接字轉(zhuǎn)換方法功能說(shuō)明window(windowLength,slideInterval)基于窗口生成新的DStreamwindowLength為窗口大小,slideInterval為滑動(dòng)時(shí)間間隔CountByWindow(windowLength,slideInterval)窗口中元素計(jì)數(shù)windowLength為窗口大小,slideInterval為滑動(dòng)時(shí)間間隔reduceByWindow(function,windowLength,slideInterval)利用function對(duì)窗口內(nèi)單一元素進(jìn)行聚合function為自定義函數(shù)reduceByKeyAndWindow(function,windowLength,slideInterval,[numTasks])利用function對(duì)窗口內(nèi)相同鍵值的數(shù)據(jù)進(jìn)行聚合[numTasks]為聚合任務(wù)的數(shù)量reduceByKeyAndWindow(function,invFunc,windowLength,slideInterval,[numTasks])更加高效的利用function對(duì)窗口內(nèi)相同鍵值的數(shù)據(jù)進(jìn)行增量聚合function函數(shù)指定當(dāng)前窗口中的數(shù)據(jù)聚合方式,invFunc是從窗口中移除數(shù)據(jù)的去除函數(shù)表8.1常用的滑動(dòng)轉(zhuǎn)換操作8.2.5Spark流式計(jì)算StructuredStreaming

StructuredStreaming是基于DataFrame的結(jié)構(gòu)化流數(shù)據(jù)處理技術(shù),該技術(shù)將SparkSQL和SparkStreaming結(jié)合起來(lái),將實(shí)時(shí)數(shù)據(jù)流看成一張不斷進(jìn)行添加的數(shù)據(jù)表,對(duì)表中增量部分的數(shù)據(jù),按照固定時(shí)間間隔獲取并進(jìn)行處理。StructuredStreaming數(shù)據(jù)流處理模式分為兩種,分別是微批處理和持續(xù)批處理,默認(rèn)使用微批處理模式,使用writeStream().trigger(Trigger.Continuous("1second"))可從微處理模式切換到持續(xù)批處理模式。

兩種處理模式的最大區(qū)別在于,持續(xù)批處理模式比微批處理模式更具實(shí)時(shí)性。8.2.6SparkML將SparkML機(jī)器學(xué)習(xí)庫(kù)的內(nèi)容概括如下。(1)算法,包含了常用的機(jī)器學(xué)習(xí)算法,如回歸、分類(lèi)、聚類(lèi)和協(xié)同過(guò)濾等;(2)特征工程,包含了特征提取、轉(zhuǎn)化、降維和選擇工具等;(3)流水線(xiàn)(Pipeline),用于構(gòu)建、評(píng)估和調(diào)整機(jī)器學(xué)習(xí)工作流的工具;(4)持久性,保存和加載算法、模型和管道;(5)實(shí)用工具,如線(xiàn)性代數(shù)、統(tǒng)計(jì)、數(shù)據(jù)處理等工具。Spark機(jī)器學(xué)習(xí)庫(kù)從1.2版本以后被分為兩個(gè)包,分別是SparkMLlib和SparkML。SparkMLlib包含基于RDD的原始API,SparkML則提供了基于DataFrame的高層次API,可以用來(lái)構(gòu)建機(jī)器學(xué)習(xí)流水線(xiàn)(MLPipeLine),機(jī)器學(xué)習(xí)流水線(xiàn)彌補(bǔ)了SparkMLlib庫(kù)的不足。8.2.6SparkML向量、標(biāo)注點(diǎn)和矩陣等基礎(chǔ)數(shù)據(jù)類(lèi)型支持著SparkML中機(jī)器學(xué)習(xí)算法。向量分為稠密向量(DenseVector)和稀疏向量(SparseVector),用浮點(diǎn)數(shù)存儲(chǔ)數(shù)據(jù),兩種向量分別繼承自pyspark.ml.linalg.Vectors類(lèi)。標(biāo)注點(diǎn)(LabeledPoint)是帶標(biāo)簽的本地向量,標(biāo)注點(diǎn)的實(shí)現(xiàn)在pyspark.ml.regression.LabeledPoint類(lèi)中,一個(gè)標(biāo)注點(diǎn)由一個(gè)浮點(diǎn)類(lèi)型的標(biāo)簽和一個(gè)向量組成。矩陣是由向量組成的集合,在pyspark.ml.linalg.Matrix類(lèi)中有DenseMatrix和SparseMatrix兩種矩陣。基本數(shù)據(jù)類(lèi)型8.2.6SparkML數(shù)據(jù)預(yù)處理特征提取SparkML中提供了詞頻-逆向文件詞頻(TermFrequency-InverseDocumentFrequercy,TF-IDF)、Word2Vec、CountVectorizer等幾種常用的特征提取操作。

(8.1)TF-IDF的度量值的計(jì)算如式(8.2)所示。(8.2)8.2.6SparkML數(shù)據(jù)預(yù)處理特征提取圖8.50HashingTF提取特征向量圖8.51TF-IDF特征提取8.2.6SparkML數(shù)據(jù)預(yù)處理特征轉(zhuǎn)換SparkML的包中實(shí)現(xiàn)了幾個(gè)相關(guān)的轉(zhuǎn)換器,如位于pyspark.ml.feature包中的StringIndexer()、IndexToString()、OneHotEncoder()、VectorIndexer()方法等。IndexToString()方法常和StringIndexer()方法配合使用,先用StringIndexer()方法將類(lèi)別值轉(zhuǎn)化成數(shù)值,進(jìn)行模型訓(xùn)練,在訓(xùn)練結(jié)束后,再將數(shù)值轉(zhuǎn)化成原有的類(lèi)別值圖8.52StringIndexer()示例圖8.53IndexToString()實(shí)例8.2.6SparkML數(shù)據(jù)預(yù)處理特征選擇SparkML中的特征選擇在pyspark.ml.feature包中,有VectorSlicer()、RFormula()和

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論