版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第7章大數(shù)據(jù)分析算法本章目標(biāo)本章主要內(nèi)容是SparkMLlib中的經(jīng)典聚類算法和分類算法,通過學(xué)習(xí)本章,應(yīng)該達(dá)到以下目標(biāo):掌握幾種經(jīng)典的大數(shù)據(jù)聚類和分類算法的基本原理、詳細(xì)步驟、應(yīng)用實(shí)例和Spark實(shí)現(xiàn)。通過大數(shù)據(jù)聚類算法和分類算法的SparkMLlib實(shí)現(xiàn)進(jìn)行實(shí)例分析,掌握大數(shù)據(jù)聚類和分類分析程序的設(shè)計(jì)思路和實(shí)現(xiàn)方法。本章內(nèi)容7.1聚類算法7.1.1經(jīng)典聚類算法聚類算法的基本原理K-Means算法二分K-Means算法高斯混合模型冪迭代聚類7.1.2大數(shù)據(jù)聚類算法的應(yīng)用7.2分類算法7.2.1經(jīng)典分類算法分類算法的基本原理決策樹算法邏輯回歸算法支持向量機(jī)算法隨機(jī)森林算法樸素貝葉斯算法7.2.2大數(shù)據(jù)分類算法的應(yīng)用7.1聚類算法7.1.1經(jīng)典聚類算法聚類算法的基本原理目的:通過分析將沒有標(biāo)簽的、事先不知道會(huì)分為幾類的數(shù)據(jù)劃分成若干簇,并保證每一簇中的數(shù)據(jù)盡量接近,而不同簇的數(shù)據(jù)距離盡量遠(yuǎn)?;舅悸罚豪脭?shù)據(jù)集中數(shù)據(jù)特征值之間的距離遠(yuǎn)近來判斷數(shù)據(jù)是否被劃分到同一類別。聚類算法的基本原理聚類算法基于劃分的聚類算法基于層次的聚類算法基于密度的聚類算法基于網(wǎng)格的聚類算法K-Means算法K-Means算法是一種迭代求解的基于劃分的聚類算法?;舅悸罚菏紫瘸跏蓟疜個(gè)劃分,然后將樣本從一個(gè)劃分轉(zhuǎn)移到另一個(gè)劃分以優(yōu)化聚類質(zhì)量,迭代此過程直至最大迭代次數(shù)。K-Means算法優(yōu)點(diǎn):易于理解算法復(fù)雜度低聚類效果雖然是局部最優(yōu)但足夠解決大部分問題對(duì)較大規(guī)模的數(shù)據(jù)集可以保證較好的伸縮性,即數(shù)據(jù)對(duì)象從幾百到幾百萬都能保持聚類結(jié)果準(zhǔn)確度一致。K-Means算法缺點(diǎn):k值需要人為設(shè)定,不同的k值結(jié)果也不同。初始劃分的中心也會(huì)影響聚類結(jié)果,不同的選取方式得到的結(jié)果不同。
K-Means算法對(duì)孤立點(diǎn)和噪聲數(shù)據(jù)比較敏感,異常值會(huì)導(dǎo)致均值偏離嚴(yán)重,陷入局部最小值。離散程度高的分類、非凸分類、類別不平衡的分類就不適合使用K-Means算法。K-Means算法的基本步驟開始根據(jù)給定的k值,選取k個(gè)初始劃分中心計(jì)算數(shù)據(jù)集中每個(gè)樣本點(diǎn)到k個(gè)劃分中心的距離,將樣本點(diǎn)劃分到距離最近的劃分中心的類中針對(duì)每個(gè)類別計(jì)算樣本點(diǎn)的平均值,得到新的劃分中心是否達(dá)到最大迭代次數(shù)或劃分中心變化是否小于設(shè)定的閾值結(jié)束否是K-Means算法的應(yīng)用實(shí)例fromnumpyimportarray
frompysparkimportSparkContextfrompyspark.mllib.clusteringimportKMeans,KMeansModel
#appName:在集群webUI上顯示的作業(yè)名稱。#master:要連接到的集群
URL(例如
mesos:#host:port,spark:#host:port,#local[4])。sc=SparkContext(appName="KMeans_pyspark",master='local')第1步:引入必要的類。使用pyspark時(shí)需要引入SparkContext類,SparkContext是spark功能的入口點(diǎn),在引入類后需要定義sc=SparkContext(appName="KMeans_pyspark",master='local'),否則運(yùn)行程序時(shí)可能會(huì)報(bào)錯(cuò)。K-Means算法需要從pyspark.mllib.clustering中引入KMeans類和KMeansModel類。K-Means算法的應(yīng)用實(shí)例nodes=array([0.0,0.0,1.0,1.0,9.0,8.0,8.0,9.0]).reshape(4,2)print(nodes)第2步:創(chuàng)建數(shù)據(jù)集。這里創(chuàng)建一個(gè)包含4個(gè)數(shù)據(jù)點(diǎn)、每個(gè)數(shù)據(jù)點(diǎn)包括2個(gè)屬性的數(shù)據(jù)集。[[0.0.][1.1.][9.8.][8.9.]]結(jié)果如下:K-Means算法的應(yīng)用實(shí)例model=KMeans.train(sc.parallelize(nodes),2,maxIterations=10,initializationMode="random")第3步:根據(jù)上面創(chuàng)建的數(shù)據(jù)集訓(xùn)練K-Means聚類模型。其中sc.parallelize()可以將數(shù)據(jù)集轉(zhuǎn)化為MLlib需要的RDD格式,該方法有2個(gè)參數(shù),第1個(gè)參數(shù)是待轉(zhuǎn)化的數(shù)據(jù)集,第2個(gè)參數(shù)是數(shù)據(jù)集的切片數(shù)(默認(rèn)值:None)。創(chuàng)建K-Means模型需要用到KMeans.train()方法。下面的例子中runs、seed、initializationSteps、epsilon和initialModel參數(shù)沒有指定,使用默認(rèn)值。K-Means算法的應(yīng)用實(shí)例forclusterCenterinmodel.clusterCenters:print(clusterCenter)fornodeinnodes:print(str(node)+"belongstocluster"+str(model.clusterCenters[model.predict(node)]))第4步:根據(jù)KMeansModel自帶的屬性及方法獲得聚類中心和樣本點(diǎn)對(duì)應(yīng)的聚類值。[8.58.5][0.50.5][0.0.]belongstocluster[0.50.5][1.1.]belongstocluster[0.50.5][9.8.]belongstocluster[8.58.5][8.9.]belongstocluster[8.58.5]結(jié)果如下:K-Means算法的spark實(shí)現(xiàn)K-Means算法的實(shí)現(xiàn)由方法runAlgorithmWithWeight()完成。該方法進(jìn)行訓(xùn)練的整體思路是數(shù)據(jù)分區(qū),將聚類中心點(diǎn)信息廣播至各個(gè)分區(qū)計(jì)算每個(gè)中心點(diǎn)的累計(jì)距離距離(損失),累計(jì)坐標(biāo)值和計(jì)數(shù);以聚類的索引ID作為key進(jìn)行reduce,計(jì)算整個(gè)數(shù)據(jù)集的每個(gè)中心點(diǎn)的累計(jì)距離和成本,累計(jì)坐標(biāo)值和計(jì)數(shù)。privatedefrunAlgorithmWithWeight(data:RDD[VectorWithNorm],instr:Option[Instrumentation]):KMeansModel={
valsc=data.sparkContext
valinitStartTime=System.nanoTime()
valdistanceMeasureInstance=DistanceMeasure.decodeFromString(this.distanceMeasure)K-Means算法的spark實(shí)現(xiàn)
#初始化中心,支持random(隨機(jī)選擇)或K-Means||(選擇最優(yōu)樣本點(diǎn))
valcenters=initialModelmatch{caseSome(kMeansCenters)=>kMeansCenters.clusterCenters.map(newVectorWithNorm(_))caseNone=>if(initializationMode==KMeans.RANDOM){initRandom(data)}else{initKMeansParallel(data,distanceMeasureInstance)}}valnumFeatures=centers.head.vector.sizevalinitTimeInSeconds=(System.nanoTime()-initStartTime)/1e9logInfo(f"Initializationwith$initializationModetook$initTimeInSeconds%.3fseconds.")K-Means算法的spark實(shí)現(xiàn)varconverged=falsevarcost=0.0variteration=0valiterationStartTime=System.nanoTime()instr.foreach(_.logNumFeatures(numFeatures))valshouldDistributed=centers.length*centers.length*numFeatures.toLong>1000000LK-Means算法的spark實(shí)現(xiàn)
#執(zhí)行Lloyd算法的迭代,直到收斂
while(iteration<maxIterations&&!converged){valbcCenters=sc.broadcast(centers)valstats=if(shouldDistributed){distanceMeasureIputeStatisticsDistributedly(sc,bcCenters)}else{distanceMeasureIputeStatistics(centers)}valbcStats=sc.broadcast(stats)valcostAccum=sc.doubleAccumulatorK-Means算法的spark實(shí)現(xiàn)
#找到新的中心
valcollected=data.mapPartitions{points=>valcenters=bcCenters.valuevalstats=bcStats.valuevaldims=centers.head.vector.sizevalsums=Array.fill(centers.length)(Vectors.zeros(dims))#使用clusterWeightSum計(jì)算聚類中心
#clustercenter=sample1*weight1/clusterWeightSum+sample2*weight2/clusterWeightSum+...K-Means算法的spark實(shí)現(xiàn)
#創(chuàng)建clusterWeightSum數(shù)組,長(zhǎng)度為中心個(gè)數(shù)
valclusterWeightSum=Array.ofDim[Double](centers.length)points.foreach{point=> #針對(duì)每一個(gè)樣本點(diǎn)
val(bestCenter,cost)=distanceMeasureInstance.findClosest(centers,stats,point) #計(jì)算樣本點(diǎn)point屬于哪個(gè)中心點(diǎn)和在該點(diǎn)下的cost值
costAccum.add(cost*point.weight)distanceMeasureInstance.updateClusterSum(point,sums(bestCenter))clusterWeightSum(bestCenter)+=point.weight}K-Means算法的spark實(shí)現(xiàn)Iterator.tabulate(centers.length)(j=>(j,(sums(j),clusterWeightSum(j)))).filter(_._2._2>0)}.reduceByKey{(sumweight1,sumweight2)=>axpy(1.0,sumweight2._1,sumweight1._1)(sumweight1._1,sumweight1._2+sumweight2._2)}.collectAsMap()if(iteration==0){instr.foreach(_.logNumExamples(costAccum.count))instr.foreach(_.logSumOfWeights(collected.values.map(_._2).sum))}
bcCenters.destroy()bcStats.destroy()K-Means算法的spark實(shí)現(xiàn)
#更新聚類中心和成本
converged=truecollected.foreach{case(j,(sum,weightSum))=>valnewCenter=distanceMeasureInstance.centroid(sum,weightSum)if(converged&&!distanceMeasureInstance.isCenterConverged(centers(j),newCenter,epsilon)){converged=false}centers(j)=newCenter}cost=costAccum.valueinstr.foreach(_.logNamedValue(s"Cost@iter=$iteration",s"$cost"))iteration+=1}K-Means算法的spark實(shí)現(xiàn)
valiterationTimeInSeconds=(System.nanoTime()-iterationStartTime)/1e9logInfo(f"Iterationstook$iterationTimeInSeconds%.3fseconds.")if(iteration==maxIterations){logInfo(s"KMeansreachedthemaxnumberofiterations:$maxIterations.")}else{logInfo(s"KMeansconvergedin$iterationiterations.")}logInfo(s"Thecostis$cost.")newKMeansModel(centers.map(_.vector),distanceMeasure,cost,iteration)}二分K-Means算法
二分K-Means算法的基本步驟開始將所有樣本點(diǎn)劃分到一個(gè)聚類簇中,再將這個(gè)簇一分為二選擇劃分后能最大程度降低SSE的簇作為可分解的簇使用K-Means算法將可分解的簇分解成兩簇是否達(dá)到最大迭代次數(shù)或劃分中心變化是否小于設(shè)定的閾值結(jié)束否是二分K-Means算法的應(yīng)用實(shí)例fromnumpyimportarray
frompysparkimportSparkContextfrompyspark.mllib.clusteringimportBisectingKMeans,BisectingKMeansModel#appName:在集群webUI上顯示的作業(yè)名稱。#master:要連接到的集群URL(例如mesos:#host:port,spark:#host:port,#local[4])。sc=SparkContext(appName="BisectingKMeans_pyspark",master='local')第1步:引入必要的類。使用pyspark時(shí)需要引入SparkContext類,SparkContext是spark功能的入口點(diǎn),在引入類后需要定義sc=SparkContext(appName="KMeans_pyspark",master='local'),否則運(yùn)行程序時(shí)可能會(huì)報(bào)錯(cuò)。二分K-Means算法需要從pyspark.mllib.clustering中引入BisectingKMeans類和BisectingKMeansModel類。二分K-Means算法的應(yīng)用實(shí)例nodes=array([0.0,0.0,1.0,1.0,9.0,8.0,8.0,9.0,5.0,4.0,4.0,5.0,3.0,4.0,4.0,2.0]).reshape(8,2)print(nodes)第2步:創(chuàng)建數(shù)據(jù)集。這里創(chuàng)建一個(gè)包含8個(gè)數(shù)據(jù)點(diǎn)、每個(gè)數(shù)據(jù)點(diǎn)包括2個(gè)屬性的數(shù)據(jù)集。[[0.0.][1.1.][9.8.][8.9.][5.4.][4.5.][3.4.][4.2.]]結(jié)果如下:二分K-Means算法的應(yīng)用實(shí)例bskm=BisectingKMeans()model=bskm.train(sc.parallelize(nodes,2),k=4)第3步:根據(jù)上面創(chuàng)建的數(shù)據(jù)集創(chuàng)建二分K-Means聚類模型。創(chuàng)建二分K-Means模型需要用到BisectingKMeans.train()方法。下面的例子中maxIterations、Mindivisbleclustersize和seed參數(shù)沒有指定,使用默認(rèn)值。二分K-Means算法的應(yīng)用實(shí)例forclusterCenterinmodel.clusterCenters:print(clusterCenter)第4步:根據(jù)BisectingKMeansModel自帶的屬性及方法獲得聚類中心和樣本點(diǎn)對(duì)應(yīng)的聚類值。[0.50.5][3.53.][4.54.5][8.58.5]結(jié)果如下:fornodeinnodes:print(str(node)+"belongstocluster"+str(model.clusterCenters[model.predict(node)]))[0.0.]belongstocluster[0.50.5][1.1.]belongstocluster[0.50.5][9.8.]belongstocluster[8.58.5][8.9.]belongstocluster[8.58.5][5.4.]belongstocluster[4.54.5][4.5.]belongstocluster[4.54.5][3.4.]belongstocluster[3.53.][4.2.]belongstocluster[3.53.]結(jié)果如下:二分K-Means算法的spark實(shí)現(xiàn)二分K-means算法在MLlib中也是通過API調(diào)用scala版本的BisectingKMeans類實(shí)現(xiàn)的。該算法從一個(gè)包含所有點(diǎn)的集群開始。它迭代地在底層找到可劃分的簇,并使用K-means將每個(gè)簇平分,直到總共有k個(gè)簇或沒有簇可劃分為止。將同一層的簇的平分步驟組合在一起,以增加并行性。如果在底層將所有可分簇平分將導(dǎo)致更多的k個(gè)簇,更大的簇獲得更高的優(yōu)先級(jí)。deftrain(self,rdd,k=4,maxIterations=20,minDivisibleClusterSize=1.0,seed=-1888008604):#調(diào)用trainBisectingKMeansAPI java_model=callMLlibFunc("trainBisectingKMeans",rdd.map(_convert_to_vector),k,maxIterations,minDivisibleClusterSize,seed)returnBisectingKMeansModel(java_model)二分K-Means算法的spark實(shí)現(xiàn)#trainBisectingKMeansAPIdeftrainBisectingKMeans(data:JavaRDD[Vector],k:Int,maxIterations:Int,minDivisibleClusterSize:Double,seed:java.lang.Long):BisectingKMeansModel={valkmeans=newBisectingKMeans().setK(k).setMaxIterations(maxIterations).setMinDivisibleClusterSize(minDivisibleClusterSize)if(seed!=null)kmeans.setSeed(seed)kmeans.run(data)#調(diào)用BisectingKMeans類中的run()方法}二分K-Means算法的spark實(shí)現(xiàn)defrun(input:RDD[Vector]):BisectingKMeansModel={#將數(shù)據(jù)集由RDD[Vector]轉(zhuǎn)化為RDD[(Vector,Double)]格式valinstances=input.map(point=>(point,1.0))#判斷RDD當(dāng)前是否設(shè)置存儲(chǔ)級(jí)別valhandlePersistence=input.getStorageLevel==StorageLevel.NONE#調(diào)用runWithWeight()方法實(shí)現(xiàn)算法
runWithWeight(instances,handlePersistence,None)}Scala版本的BisectingKMeans類按照給定的參數(shù)運(yùn)行run方法訓(xùn)練模型。具體源碼如下:二分K-Means算法的spark實(shí)現(xiàn)#runWithWeight()方法具體實(shí)現(xiàn)二分K-means算法private[spark]defrunWithWeight(instances:RDD[(Vector,Double)],handlePersistence:Boolean,instr:Option[Instrumentation]):BisectingKMeansModel={vald=instances.map(_._1.size).firstlogInfo(s"Featuredimension:$d.")valdMeasure=DistanceMeasure.decodeFromString(this.distanceMeasure)valnorms=instances.map(d=>Vectors.norm(d._1,2.0))#計(jì)算數(shù)據(jù)的二范數(shù)#將數(shù)據(jù)轉(zhuǎn)化成VectorWithNorm類
valvectors=instances.zip(norms).map{case((x,weight),norm)=>newVectorWithNorm(x,norm,weight)}if(handlePersistence){vectors.persist(StorageLevel.MEMORY_AND_DISK)}else{
二分K-Means算法的spark實(shí)現(xiàn)
#計(jì)算和緩存用于快速距離計(jì)算的向量范數(shù)。
norms.persist(StorageLevel.MEMORY_AND_DISK)}varassignments=vectors.map(v=>(ROOT_INDEX,v))varactiveClusters=summarize(d,assignments,dMeasure)instr.foreach(_.logNumExamples(activeClusters.values.map(_.size).sum))instr.foreach(_.logSumOfWeights(activeClusters.values.map(_.weightSum).sum))valrootSummary=activeClusters(ROOT_INDEX)valn=rootSummary.sizelogInfo(s"Numberofpoints:$n.")logInfo(s"Initialcost:${rootSummary.cost}.")valminSize=if(minDivisibleClusterSize>=1.0){math.ceil(minDivisibleClusterSize).toLong}else{math.ceil(minDivisibleClusterSize*n).toLong}二分K-Means算法的spark實(shí)現(xiàn)logInfo(s"Theminimumnumberofpointsofadivisibleclusteris$minSize.")varinactiveClusters=mutable.Seq.empty[(Long,ClusterSummary)]valrandom=newRandom(seed)varnumLeafClustersNeeded=k-1varlevel=1varpreIndices:RDD[Long]=nullvarindices:RDD[Long]=nullwhile(activeClusters.nonEmpty&&numLeafClustersNeeded>0&&level<LEVEL_LIMIT){
#可分集群具有足夠大和非平凡成本。
vardivisibleClusters=activeClusters.filter{case(_,summary)=>(summary.size>=minSize)&&(summary.cost>MLUtils.EPSILON*summary.size)}
#如果我們不需要所有可分集群,選擇較大的集群。
if(divisibleClusters.size>numLeafClustersNeeded){divisibleClusters=divisibleClusters.toSeq.sortBy{case(_,summary)=>-summary.size}.take(numLeafClustersNeeded).toMap}二分K-Means算法的spark實(shí)現(xiàn)if(divisibleClusters.nonEmpty){valdivisibleIndices=divisibleClusters.keys.toSetlogInfo(s"Dividing${divisibleIndices.size}clustersonlevel$level.")varnewClusterCenters=divisibleClusters.flatMap{case(index,summary)=>val(left,right)=splitCenter(summary.center,random,dMeasure)Iterator((leftChildIndex(index),left),(rightChildIndex(index),right))}.map(identity)#解決產(chǎn)生不可序列化映射的Scalabug(SI-7005)varnewClusters:Map[Long,ClusterSummary]=nullvarnewAssignments:RDD[(Long,VectorWithNorm)]=nullfor(iter<-0untilmaxIterations){newAssignments=updateAssignments(assignments,divisibleIndices,newClusterCenters,dMeasure).filter{case(index,_)=>divisibleIndices.contains(parentIndex(index))}newClusters=summarize(d,newAssignments,dMeasure)newClusterCenters=newClusters.mapValues(_.center).map(identity).toMap}二分K-Means算法的spark實(shí)現(xiàn)if(preIndices!=null){preIndices.unpersist()}preIndices=indicesindices=updateAssignments(assignments,divisibleIndices,newClusterCenters,dMeasure).keys.persist(StorageLevel.MEMORY_AND_DISK)assignments=indices.zip(vectors)inactiveClusters++=activeClustersactiveClusters=newClustersnumLeafClustersNeeded-=divisibleClusters.size}else{logInfo(s"Noneactiveanddivisibleclustersleftonlevel$level.Stopiterations.")inactiveClusters++=activeClustersactiveClusters=Map.empty}level+=1}二分K-Means算法的spark實(shí)現(xiàn)if(preIndices!=null){preIndices.unpersist()}if(indices!=null){indices.unpersist()}if(handlePersistence){vectors.unpersist()}else{norms.unpersist()}valclusters=activeClusters++inactiveClustersvalroot=buildTree(clusters,dMeasure)valtotalCost=root.leafNodes.map(_.cost).sumnewBisectingKMeansModel(root,this.distanceMeasure,totalCost)}高斯混合模型高斯混合模型(GaussianMixtureModel,GMM)與目前大多數(shù)聚類算法不同,其他聚類算法多以相似度為劃分依據(jù),而GMM則將概率作為判斷依據(jù)。它假設(shè)所有樣本都是由某個(gè)給定參數(shù)的多元高斯分布生成,通過樣本點(diǎn)屬于某一類別的概率大小來判斷該樣本點(diǎn)所屬的聚類?;舅悸罚航o定聚類的簇個(gè)數(shù)k,對(duì)給定數(shù)據(jù)集,使用極大似然估計(jì)法,推導(dǎo)出每一個(gè)混合成分的均值向量μ、協(xié)方差矩陣Σ和權(quán)重w,得到的多元高斯分布對(duì)應(yīng)聚類的一個(gè)簇。高斯混合模型的基本步驟開始初始化K個(gè)多元高斯分布及其權(quán)重根據(jù)貝葉斯定理估計(jì)每個(gè)樣本由每個(gè)成分生成的后驗(yàn)概率根據(jù)均值、協(xié)方差及上一步中得到的后驗(yàn)概率,更新均值向量、協(xié)方差矩陣和權(quán)重是否達(dá)到最大迭代次數(shù)或似然函數(shù)的增加值是否小于收斂閾值結(jié)束否是高斯混合模型的應(yīng)用實(shí)例fromnumpyimportarrayfrompysparkimportSparkContext#frompyspark.mllib.linalgimportVectors,DenseMatrixfrompyspark.mllib.clusteringimportGaussianMixture,GaussianMixtureModelsc=SparkContext(appName="GMM_pyspark",master='local')第1步:引入必要的類。使用pyspark時(shí)需要引入SparkContext類,SparkContext是spark功能的入口點(diǎn),在引入類后需要定義sc=SparkContext(appName="KMeans_pyspark",master='local'),否則運(yùn)行程序時(shí)可能會(huì)報(bào)錯(cuò)。GMM算法需要從pyspark.mllib.clustering中引入GaussianMixture類和GaussianMixtureModel類,從pyspark.mllib.linalg中引入Vectors類和DenseMatrix類。高斯混合模型的應(yīng)用實(shí)例nodes=array([-0.1,-0.05,-0.01,-0.1,0.9,0.8,\0.75,0.935,-0.83,-0.68,-0.91,-0.76])\.reshape(6,2)第2步:創(chuàng)建數(shù)據(jù)集。這里創(chuàng)建一個(gè)包含6個(gè)數(shù)據(jù)點(diǎn)、每個(gè)數(shù)據(jù)點(diǎn)包括2個(gè)屬性的數(shù)據(jù)集。[[-0.1-0.05][-0.01-0.1][0.90.8][0.750.935][-0.83-0.68][-0.91-0.76]]結(jié)果如下:高斯混合模型的應(yīng)用實(shí)例clusterdata=sc.parallelize(nodes,2)model=GaussianMixture.train(clusterdata,3,convergenceTol=0.0001,maxIterations=50,seed=10)第3步:根據(jù)上面創(chuàng)建的數(shù)據(jù)集訓(xùn)練GMM聚類模型。訓(xùn)練GMM模型需要用到GaussianMixture.train()方法。下面的例子中initialModel參數(shù)沒有指定,使用默認(rèn)值。高斯混合模型的應(yīng)用實(shí)例fornodeinnodes:print(str(node)+"belongstocluster:"+str(model.predict(node)))第4步:根據(jù)GaussianMixtureModel自帶的屬性及方法獲得樣本點(diǎn)對(duì)應(yīng)的聚類值。[-0.1-0.05]belongstocluster:2[-0.01-0.1]belongstocluster:2[0.90.8]belongstocluster:0[0.750.935]belongstocluster:0[-0.83-0.68]belongstocluster:1[-0.91-0.76]belongstocluster:1結(jié)果如下:softPredicted=model.predictSoft([-0.1,-0.05])abs(softPredicted[0]-1.0)<0.03False結(jié)果如下:高斯混合模型的spark實(shí)現(xiàn)在SparkMLlib中,GMM主要通過GaussianMixture類實(shí)現(xiàn)。此類對(duì)GMM執(zhí)行期望最大化。高斯混合模型表示一個(gè)獨(dú)立的高斯分布的綜合分布,其相關(guān)的“混合”權(quán)重指定每個(gè)人對(duì)綜合的貢獻(xiàn)。給定一組樣本點(diǎn),這個(gè)類將最大化k個(gè)高斯混合體的對(duì)數(shù)可能性,迭代直到對(duì)數(shù)可能性的變化小于convergenceTol,或者直到達(dá)到迭代次數(shù)的最大值。雖然這個(gè)過程一般保證收斂,但不能保證找到全局最優(yōu)解。deftrain(cls,rdd,k,convergenceTol=1e-3,maxIterations=100,seed=None,initialModel=None):initialModelWeights=None#權(quán)重winitialModelMu=None#均值向量μ
initialModelSigma=None#協(xié)方差矩陣Σ
ifinitialModelisnotNone:ifinitialModel.k!=k:raiseValueError("Mismatchedclustercount,initialModel.k=%s,howeverk=%s"%(initialModel.k,k))initialModelWeights=list(initialModel.weights)initialModelMu=[initialModel.gaussians[i].muforiinrange(initialModel.k)]initialModelSigma=[initialModel.gaussians[i].sigmaforiinrange(initialModel.k)]java_model=callMLlibFunc("trainGaussianMixtureModel",rdd.map(_convert_to_vector),k,convergenceTol,maxIterations,seed,initialModelWeights,initialModelMu,initialModelSigma)#調(diào)用APIreturnGaussianMixtureModel(java_model)高斯混合模型的spark實(shí)現(xiàn)#trainGaussianMixtureModelAPI,返回一個(gè)包含每個(gè)多元混合的權(quán)重、均值和#協(xié)方差的列表(list)deftrainGaussianMixtureModel(data:JavaRDD[Vector],k:Int,convergenceTol:Double,maxIterations:Int,seed:java.lang.Long,initialModelWeights:java.util.ArrayList[Double],initialModelMu:java.util.ArrayList[Vector],initialModelSigma:java.util.ArrayList[Matrix]):GaussianMixtureModelWrapper={#創(chuàng)建一個(gè)GaussianMixture類
valgmmAlg=newGaussianMixture().setK(k).setConvergenceTol(convergenceTol).setMaxIterations(maxIterations)高斯混合模型的spark實(shí)現(xiàn)if(initialModelWeights!=null&&initialModelMu!=null&&initialModelSigma!=null){valgaussians=initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map{case(x,y)=>newMultivariateGaussian(x,y)}valinitialModel=newGaussianMixtureModel(initialModelWeights.asScala.toArray,gaussians.toArray)gmmAlg.setInitialModel(initialModel)}if(seed!=null)gmmAlg.setSeed(seed)
#調(diào)用GaussianMixture類中的run()方法和GaussianMixtureModelWrapper#APInewGaussianMixtureModelWrapper(gmmAlg.run(data.rdd))}高斯混合模型的spark實(shí)現(xiàn)MLlib中實(shí)現(xiàn)GMM也是通過GaussianMixture類中的run()方法實(shí)現(xiàn)的。defrun(data:RDD[Vector]):GaussianMixtureModel={valsc=data.sparkContext#使用spark的線性代數(shù)庫(kù)BreezevalbreezeData=data.map(_.asBreeze).cache()#獲得輸入向量的長(zhǎng)度
vald=breezeData.first().lengthrequire(d<GaussianMixture.MAX_NUM_FEATURES,s"GaussianMixturecannothandlemore"+s"than${GaussianMixture.MAX_NUM_FEATURES}featuresbecausethesizeofthecovariance"+s"matrixisquadraticinthenumberoffeatures.")valshouldDistributeGaussians=GaussianMixture.shouldDistributeGaussians(k,d)高斯混合模型的spark實(shí)現(xiàn)
#確定初始權(quán)重和相應(yīng)的高斯函數(shù)
#如果用戶提供了初始GMM,則使用這些值
#否則,我們從統(tǒng)一的權(quán)重開始、來自數(shù)據(jù)的隨機(jī)均值和對(duì)角協(xié)方差矩陣#使用來自樣本的分量方差
val(weights,gaussians)=initialModelmatch{caseSome(gmm)=>(gmm.weights,gmm.gaussians)caseNone=>valsamples=breezeData.takeSample(withReplacement=true,k*nSamples,seed)(Array.fill(k)(1.0/k),Array.tabulate(k){i=>valslice=samples.view.slice(i*nSamples,(i+1)*nSamples)newMultivariateGaussian(vectorMean(slice.toSeq),initCovariance(slice.toSeq))})}varllh=Double.MinValue#currentlog-likelihoodvarllhp=0.0#previouslog-likelihood高斯混合模型的spark實(shí)現(xiàn)variter=0while(iter<maxIterations&&math.abs(llh-llhp)>convergenceTol){#創(chuàng)建并廣播柯里化集群貢獻(xiàn)函數(shù)
valcompute=sc.broadcast(ExpectationSum.add(weights,gaussians)_)#將所有樣本點(diǎn)的集群貢獻(xiàn)聚合起來
valsums=breezeData.treeAggregate(ExpectationSum.zero(k,d))(compute.value,_+=_)#根據(jù)部分賦值創(chuàng)建新的分布(文獻(xiàn)中通常稱為“M-步”)
valsumWeights=sums.weights.sum高斯混合模型的spark實(shí)現(xiàn)if(shouldDistributeGaussians){valnumPartitions=math.min(k,1024)valtuples=Seq.tabulate(k)(i=>(sums.means(i),sums.sigmas(i),sums.weights(i)))val(ws,gs)=sc.parallelize(tuples,numPartitions).map{case(mean,sigma,weight)=>updateWeightsAndGaussians(mean,sigma,weight,sumWeights)}.collect().unzipArray.copy(ws,0,weights,0,ws.length)Array.copy(gs,0,gaussians,0,gs.length)}else{vari=0while(i<k){val(weight,gaussian)=updateWeightsAndGaussians(sums.means(i),sums.sigmas(i),sums.weights(i),sumWeights)weights(i)=weightgaussians(i)=gaussiani=i+1}}高斯混合模型的spark實(shí)現(xiàn)llhp=llh#當(dāng)前值變成以前的值
llh=sums.logLikelihood#最新算得的對(duì)數(shù)似然比
iter+=1compute.destroy()}breezeData.unpersist()newGaussianMixtureModel(weights,gaussians)}高斯混合模型的spark實(shí)現(xiàn)#更新權(quán)重和高斯函數(shù)privatedefupdateWeightsAndGaussians(mean:BDV[Double],sigma:BreezeMatrix[Double],weight:Double,sumWeights:Double):(Double,MultivariateGaussian)={valmu=(mean/=weight)BLAS.syr(-weight,Vectors.fromBreeze(mu),Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])valnewWeight=weight/sumWeightsvalnewGaussian=newMultivariateGaussian(mu,sigma/weight)(newWeight,newGaussian)}高斯混合模型的spark實(shí)現(xiàn)GaussianMixtureModelWrapperAPI是為GaussianMixtureModel提供輔助方法的包裝器。private[python]classGaussianMixtureModelWrapper(model:GaussianMixtureModel){valweights:Vector=Vectors.dense(model.weights)valk:Int=weights.size#以向量列表和對(duì)應(yīng)于每個(gè)多元高斯分布的矩陣的形式返回高斯分布
valgaussians:Array[Byte]={valmodelGaussians=model.gaussians.map{gaussian=>Array[Any](gaussian.mu,gaussian.sigma)}SerDe.dumps(JavaConverters.seqAsJavaListConverter(modelGaussians).asJava)}defpredictSoft(point:Vector):Vector={Vectors.dense(model.predictSoft(point))}defsave(sc:SparkContext,path:String):Unit=model.save(sc,path)}冪迭代聚類
冪迭代聚類的基本步驟開始求得一個(gè)按行歸一化的關(guān)聯(lián)矩陣W,設(shè)定期望聚類數(shù)k隨機(jī)初始化一個(gè)向量v(0)≠0
結(jié)束否是
冪迭代聚類的應(yīng)用實(shí)例importmathfrompysparkimportSparkContextfrompyspark.mllib.clusteringimportPowerIterationClustering,PowerIterationClusteringModelsc=SparkContext(appName="PIC_pyspark",master='local')第1步:引入必要的類。使用pyspark時(shí)需要引入SparkContext類,SparkContext是spark功能的入口點(diǎn),在引入類后需要定義sc=SparkContext(appName=“KMeans_pyspark”,master=‘local’),否則運(yùn)行程序時(shí)可能會(huì)報(bào)錯(cuò)。PIC算法需要從pyspark.mllib.clustering中引入PowerIterationClustering類和PowerIterationClusteringModel類。冪迭代聚類的應(yīng)用實(shí)例#產(chǎn)生一個(gè)分布在圓形上的數(shù)據(jù)模型,參數(shù)分別為圓的半徑和的個(gè)數(shù)defgenCircle(r,n):points=[]foriinrange(0,n):theta=2.0*math.pi*i/npoints.append((r*math.cos(theta),r*math.sin(theta)))returnpoints#求高斯相似度defsim(x,y):dist2=(x[0]-y[0])*(x[0]-y[0])+(x[1]-y[1])*(x[1]-y[1])returnmath.exp(-dist2/2.0)第2步:創(chuàng)建數(shù)據(jù)集。冪迭代聚類的應(yīng)用實(shí)例#產(chǎn)生同心圓樣的RDD數(shù)據(jù)模型:內(nèi)圈的圓半徑為1.0,有10個(gè)點(diǎn);外圈的圓半徑為4.0,有40個(gè)點(diǎn)r1=1.0n1=10r2=4.0n2=40n=n1+n2points=genCircle(r1,n1)+genCircle(r2,n2)similarities=[(i,j,sim(points[i],points[j]))foriinrange(1,n)forjinrange(0,i)]rdd=sc.parallelize(similarities,2)第2步:創(chuàng)建數(shù)據(jù)集。冪迭代聚類的應(yīng)用實(shí)例model=PowerIterationClustering.train(rdd,2,40)第3步:根據(jù)上面創(chuàng)建的數(shù)據(jù)集訓(xùn)練PIC聚類模型。訓(xùn)練PIC模型需要用到PowerIterationClustering.train()方法。下面的示例中initModel未指定,使用默認(rèn)值。冪迭代聚類的應(yīng)用實(shí)例result=sorted(model.assignments().collect(),key=lambdax:x.id)ifresult[0].cluster==result[3].cluster:print("True")print("共有"+str(model.k)+"個(gè)聚類")第4步:根據(jù)PowerIterationClusteringModel自帶的屬性及方法獲得樣本點(diǎn)對(duì)應(yīng)的聚類分配。True共有2個(gè)聚類結(jié)果如下:冪迭代聚類的spark實(shí)現(xiàn)在MLlib中,PIC也是通過PowerIterationClustering類中的train()方法實(shí)現(xiàn)的。deftrain(cls,rdd,k,maxIterations=100,initMode="random"):model=callMLlibFunc("trainPowerIterationClusteringModel",rdd.map(_convert_to_vector),int(k),int(maxIterations),initMode)returnPowerIterationClusteringModel(model)冪迭代聚類的spark實(shí)現(xiàn)train()方法調(diào)用了trainPowerIterationClusteringModelAPI,返回PowerIterationClusteringModel。deftrainPowerIterationClusteringModel(data:JavaRDD[Vector],k:Int,maxIterations:Int,initMode:String):PowerIterationClusteringModel={valpic=newPowerIterationClustering().setK(k).setMaxIterations(maxIterations).setInitializationMode(initMode)valmodel=pic.run(data.rdd.map(v=>(v(0).toLong,v(1).toLong,v(2))))newPowerIterationClusteringModelWrapper(model)}冪迭代聚類的spark實(shí)現(xiàn)trainPowerIterationClusteringAPI調(diào)用scala的PowerIterationClustering類中的run()方法。defrun(similarities:RDD[(Long,Long,Double)]):PowerIterationClusteringModel={valw=normalize(similarities)#規(guī)范化相似度矩陣
valw0=initModematch{case"random"=>randomInit(w)#生成隨機(jī)頂點(diǎn)屬性
case"degree"=>initDegreeVector(w)#生成度向量
}
w.unpersist()#圖w0已經(jīng)在randomInit/initDegreeVector中物化,所以我們可以不用釋放wpic(w0)}冪迭代聚類的spark實(shí)現(xiàn)#通過行的和對(duì)親和矩陣(A)進(jìn)行規(guī)范化,并返回規(guī)范化親和矩陣(W)private[clustering]defnormalize(similarities:RDD[(Long,Long,Double)]):Graph[Double,Double]={valedges=similarities.flatMap{case(i,j,s)=>if(s<0.0){thrownewSparkException(s"Similaritymustbenonnegativebutfounds($i,$j)=$s.")}if(i!=j){Seq(Edge(i,j,s),Edge(j,i,s))}else{None}}冪迭代聚類的spark實(shí)現(xiàn)valgA=Graph.fromEdges(edges,0.0)valvD=gA.aggregateMessages[Double](sendMsg=ctx=>{ctx.sendToSrc(ctx.attr)},mergeMsg=_+_,TripletFields.EdgeOnly)valgraph=Graph(vD,gA.edges).mapTriplets(e=>e.attr/math.max(e.srcAttr,MLUtils.EPSILON),newTripletFields(/*useSrc*/true,/*useDst*/false,/*useEdge*/true))materialize(graph)gA.unpersist()graph}冪迭代聚類的spark實(shí)現(xiàn)#運(yùn)行PIC算法,其中參數(shù)w是歸一化親和矩陣,即PIC文中的矩陣w,以#w,,ij,,=a,,ij,,/d,,ii,,作為它的邊性質(zhì)和冪迭代的初始向量作為它的頂點(diǎn)性質(zhì)。privatedefpic(w:Graph[Double,Double]):PowerIterationClusteringModel={valv=powerIter(w,maxIterations)valassignments=kMeans(v,k).map{case(id,cluster)=>Assignment(id,cluster)}newPowerIterationClusteringModel(k,assignments)}}冪迭代聚類的spark實(shí)現(xiàn)#生成隨機(jī)頂點(diǎn)屬性(v0)以開始冪迭代。private[clustering]defrandomInit(g:Graph[Double,Double]):Graph
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2024-2030年中國(guó)冶金石灰行業(yè)產(chǎn)能預(yù)測(cè)發(fā)展規(guī)模研究報(bào)告
- 2024年大連安居客二手房產(chǎn)信息發(fā)布協(xié)議
- 2024年定制衣柜行業(yè)市場(chǎng)調(diào)研與分析合同
- 2024年城市軌道交通通信系統(tǒng)承包合同
- 2024年婚禮策劃服務(wù)合同
- 2024年太陽能熱水系統(tǒng)安裝合同
- (2024版)委托研發(fā)人工智能技術(shù)的合同協(xié)議
- 2024年同舟共濟(jì):公益電視宣傳片聯(lián)合播出合同
- 2024年商場(chǎng)監(jiān)控系統(tǒng)建設(shè)協(xié)議
- 2024年室內(nèi)隔墻設(shè)計(jì)與施工承包合同
- 2024簡(jiǎn)易租房合同下載打印
- 前程無憂行測(cè)題庫(kù)
- 新質(zhì)生產(chǎn)力-講解課件
- 2024年西安陜鼓動(dòng)力股份有限公司招聘筆試沖刺題(帶答案解析)
- 組織行為與領(lǐng)導(dǎo)力智慧樹知到期末考試答案2024年
- 藝術(shù)中國(guó)智慧樹知到期末考試答案2024年
- 30道計(jì)量員崗位常見面試問題含HR問題考察點(diǎn)及參考回答
- 四川省公需科目2024年度數(shù)字經(jīng)濟(jì)與驅(qū)動(dòng)發(fā)展考試題庫(kù)及答案
- 京瓷哲學(xué)培訓(xùn)課件
- 部編版三年級(jí)語文(上冊(cè))標(biāo)點(diǎn)符號(hào)專項(xiàng)訓(xùn)練題(含答案)
- 油茶栽培(普通油茶)課件
評(píng)論
0/150
提交評(píng)論