阿里云E-MapReduce-最佳實踐-D_第1頁
阿里云E-MapReduce-最佳實踐-D_第2頁
阿里云E-MapReduce-最佳實踐-D_第3頁
阿里云E-MapReduce-最佳實踐-D_第4頁
阿里云E-MapReduce-最佳實踐-D_第5頁
已閱讀5頁,還剩47頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、E-MapReduce開發(fā)手冊E-MapReduce/開發(fā)手冊E-MapReduce/開發(fā)手冊 PAGE 51 PAGE 51開發(fā)手冊開發(fā)準備開發(fā)準備簡介本文檔假定:您已經開通了阿里云服務,并創(chuàng)建了Access Key ID和Access Key Secret。文中的ID指的是Access Key ID,KEY 指的是Access Key Secret。如果您還沒有開通或者還不了解OSS,請登錄OSS產品主頁獲取更多的幫助。您已經對Spark,Hadoop,Hive和Pig具備一定的認識。文中不對Spark,Hadoop,Hive和Pig開發(fā) 實踐做額外的介紹。更多的開發(fā)文檔資料可以到apac

2、he官網獲取。您已經對Scala語法具備一定的認識,文中的某些例子以Scala語言展示。OSS URI在使用E-MapReduce時,用戶將會使用到兩種OSS URI,分別是:native URI: oss:/accessKeyId:accessKeySecretbucket.endpoint/object/path用戶在作業(yè)中指定輸入輸出數(shù)據(jù)源時使用這種URI,可以類比hdfs:/。用戶操作OSS數(shù)據(jù)時,可 以將accessKeyId,accessKeySecret以及endpoint配置到Configuration中,也可以在URI中直 接指定accessKeyId,accessKeySe

3、cret以及endpoint。ref URI: ossref:/bucket/object/path只在E-MapReduce作業(yè)配置時有效,用來指定作業(yè)運行需要的資源。例如以下作業(yè)配置示例:在使用過程中,需要特別注意URI中scheme的不同。注意E-MapReduce在支持向OSS寫數(shù)據(jù)時使用OSS的multipart分片上傳方式。這里需要提醒的是,當作業(yè)異常中 斷后,OSS中會殘留作業(yè)已經生產的部分數(shù)據(jù),需要您手動刪掉。這里的行為和作業(yè)輸出到HDFS是一致的,作 業(yè)異常中斷后,HDFS也會殘留數(shù)據(jù),也需要手動刪掉。但有一個區(qū)別,OSS對使用multipart上傳的文件,它 是先放在碎片管

4、理中,所以您不僅要刪除OSS文件管理中的輸出目錄殘留文件,還需要在OSS的碎片管理中清 理一次,否則會產生數(shù)據(jù)存儲費用。示例Project本項目是一個完整的可編譯可運行的項目,包括MapReduce,Pig,Hive和Spark示例代碼,詳情如下:MapReduceHiveWordCountPigsample.hive:表的簡單查詢Sparksample.pig:Pig處理OSS數(shù)據(jù)實例SparkPiPiSparkWordCount: 單詞統(tǒng)計LinearRegression: 線性回歸OSSSampleOSS使用示例ONSSampleONS使用示例ODPSSampleODPS使用示例MNSS

5、ample:MNS使用示例LoghubSample:Loghub使用示例項目下載地址。依賴資源測試數(shù)據(jù)(data目錄下):The_Sorrows_of_Young_Werther.txt:可作為WordCount(MapReduce/Spark)的輸入數(shù)據(jù)patterns.txt:WordCount(MapReduce)作業(yè)的過濾字符u.data:sample.hive腳本的測試表數(shù)據(jù)abalone:線性回歸算法測試數(shù)據(jù)依賴jar包(lib目錄下)tutorial.jar:sample.pig作業(yè)需要的依賴jar包準備工作本項目提供了一些測試數(shù)據(jù),您可以簡單地將其上傳到OSS中即可使用。其他示

6、例,例如ODPS,MNS,ONS和Loghub等等,需要您自己準備數(shù)據(jù)如下:LogStore,參考日志服務用戶指南。ODPS項目和表,參考ODPS快速開始。ONS,參考消息隊列快速開始。MNS,參考消息服務控制臺使用幫助?;靖拍睿篛SSURI: oss:/accessKeyId:accessKeySecretbucket.endpoint/a/b/c.txt,用戶在作業(yè)中指定輸入輸出數(shù)據(jù)源時使用,可以類比hdfs:/。阿里云AccessKeyId/AccessKeySecret是您訪問阿里云API的密鑰,你可以在這里獲取。集群運行SparkSparkWordCount: spark-subm

7、it -class SparkWordCount examples-1.0- SNAPSHOT-shaded.jarinputPathoutputPathnumPartitionRDD分片數(shù)目SparkPi: spark-submit -class SparkPi examples-1.0-SNAPSHOT-shaded.jarOSSSample:spark-submit -class OSSSample examples-1.0-SNAPSHOT-shaded.jar inputPathnumPartition:輸入數(shù)據(jù)RDD分片數(shù)目ONSSample: spark-submit -clas

8、s ONSSample examples-1.0-SNAPSHOT- shaded.jar accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretconsumerIdConsumerID說明topictopicsubExpression消息過濾。parallelism:指定多少個接收器來消費隊列消息。ODPSSample: spark-submit -class ODPSSample examples-1.0-SNAPSHOT- shaded.jaraccessKeyIdAccessKeyIdaccessKeySecret:阿里云Acc

9、essKeySecretenvType: 0表示公網環(huán)境,1表示內網環(huán)境。如果是本地調試選擇0,如果是在E-MapReduce上執(zhí)行請選擇1。table:參考ODPS術語介紹。numPartition:輸入數(shù)據(jù)RDD分片數(shù)目MNSSample:spark-submit-classMNSSampleexamples-1.0-SNAPSHOT-shaded.jar queueName:隊列名,參考MNS名詞解釋。accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretendpoint:隊列數(shù)據(jù)訪問地址LoghubSample: spark-s

10、ubmit -class LoghubSample examples-1.0-SNAPSHOT- shaded.jar slsprojectLogService項目名slslogstoreloghubgroupname:作業(yè)中消費日志數(shù)據(jù)的組名,可以任意取。sls project,sls store相同時,相同組名的作業(yè)會協(xié)同消費sls store中的數(shù)據(jù);不同組名的作業(yè)會相互隔離地消費slsstore中的數(shù)據(jù)。slsendpoint日志服務入口。accessKeyIdAccessKeyIdaccessKeySecret:阿里云AccessKeySecretbatchintervalsecon

11、dsSparkStreaming作業(yè)的批次間隔,單位為秒。LinearRegression: spark-submit -class LinearRegression examples-1.0-SNAPSHOT- shaded.jar inputPath:輸入數(shù)據(jù)numPartition:輸入數(shù)據(jù)RDD分片數(shù)目MapreduceWordCount: hadoop jar examples-1.0-SNAPSHOT-shaded.jar WordCount - Dwordcount.case.sensitive=true -skip inputPathl:輸入數(shù)據(jù)路徑outputPath:輸出路

12、徑patternPath:過濾字符文件,可以使用data/patterns.txtHivehive -f sample.hive -hiveconfinputPath=inputPath:輸入數(shù)據(jù)路徑Pigpig-xmapreduce-fsample.pig-paramtutorial=-param input= -paramresult=tutorialJarPath:依賴Jar包,可使用lib/tutorial.jarinputPath:輸入數(shù)據(jù)路徑resultPath:輸出路徑注意:本地運行如果在-MapReduce上使用時,請將測試數(shù)據(jù)和依賴jar包上傳到OSS中,路徑規(guī)則遵循OSSU

13、RI定義,見上。如果集群中使用,可以放在機器本地。這里主要介紹如何在本地運行Spark程序訪問阿里云數(shù)據(jù)源,例如OSS等。如果希望本地調試運行,最好借助一 些開發(fā)工具,例如IntellijIDEA或者Eclipse。尤其是Windows環(huán)境,否則需要在Windows機器上配置Hadoop和Spark運行環(huán)境,很麻煩。Intellij IDEA前提:安裝IntellijIDEA,MavenIntellijIDEAMaven插件,Scala,IntellijIDEA Scala插件雙擊進入SparkWordCount.scala從下圖箭頭所指處進入作業(yè)配置界面選擇SparkWordCount,在作

14、業(yè)參數(shù)框中按照所需傳入作業(yè)參數(shù)點擊OK點擊運行按鈕,執(zhí)行作業(yè)查看作業(yè)執(zhí)行日志Scala IDE for Eclipse前提:安裝ScalaIDEforEclipse,Maven,EclipseMaven插件導入項目RunAsMavenbuild,快捷鍵是AltShilftXM;也可以在項目名上右鍵,Run As選擇Mavenbuild等待編譯完后,在需要運行的作業(yè)上右鍵,選擇RunConfiguration,進入配置頁在配置頁中,選擇ScalaApplication,并配置作業(yè)的MainClass和參數(shù)等等。點擊Run查看控制臺輸出日志SparkSpark開發(fā)準備安裝直接在Eclipse中使用

15、JAR包,步驟如下:在官方網站下載E-MapReduceSDK。解壓并將emr-sdk_2.10-1.1.2.jar和emr-core-1.1.2.jar拷貝到您的工程文件夾中。在EclipsePropertiesJavaBuildPathAddJARs選擇您下載的SDK。經過上面幾步之后,您就可以在工程中讀寫OSS,LogService,MNS,ONS,ODPS等數(shù) 據(jù)了。Maven地址com.aliyun.emremr-core1.1.2com.aliyun.emremr-sdk_2.10emr-sdk_配置說明Spark代碼中按照如下格式配置:屬性名默認值說明spark

16、.hadoop.fs.oss.accessKe yId無訪問OSS所需的Access Key ID(可選)spark.hadoop.fs.oss.accessKe ySecret無訪問OSS所需的Access Key Secret(可選)spark.hadoop.fs.oss.securityT oken無訪問OSS所需的STS token(可選)spark.hadoop.fs.oss.endpoint無訪問OSS的endpoint(可選)spark.hadoop.fs.oss.multipart.thread.number5并發(fā)進行OSS的upload part copy的并發(fā)度spark.

17、hadoop.fs.oss.copy.sim ple.max.byte134217728使用普通接口進行OSS內部copy的文件大小上限spark.hadoop.fs.oss.multipart.split.max.byte67108864使用普通接口進行OSS內部copy的文件分片大小上限spark.hadoop.fs.oss.multipart.split.number5使用普通接口進行OSS內部copy的文件分片數(shù)目,默認和拷貝并發(fā)數(shù)目保持一致spark.hadoop.fs.oss.implcom.aliyun.fs.oss.nat.NativeO ssFileSystemOSS文件系統(tǒng)

18、實現(xiàn)類s/mnt/disk1,/mnt/disk2,oss本地臨時文件目錄,默認使用集群的數(shù)據(jù)盤spark.hadoop.fs.oss.buffer.dir s.existsfalse是否確保oss臨時目錄已經存在spark.hadoop.fs.oss.client.co nnection.timeout50000OSS Client端的連接超時時間(單位毫秒)spark.hadoop.fs.oss.client.soc ket.timeout50000OSS Client端的socket超時時間(單位毫秒)spark.hadoop.fs.oss.client.co nnection.ttl-

19、1連接存活時間spark.hadoop.fs.oss.connecti on.max1024最大連接數(shù)目spark.hadoop.job.runlocalfalse當數(shù)據(jù)源是OSS時,如果需要本地調試運行Spark代碼,需要設置此項為true,否認為falseerval200Receiver向LogHub取數(shù)據(jù)的時.millis間間隔rtrue是否有序消費分裂后的Shard數(shù)據(jù) lis30000消費進程的心跳保持間隔spark.mns.batchMsg.size16批量拉取MNS消息條數(shù),最大不能超過16spark.mns.pollingWait.secon ds30MNS隊列為空時的拉取等待

20、間隔Spark代碼本地調試val conf = new SparkConf().setAppName(getAppName).setMaster(local4) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) conf.set(spark.hadoop.job.runlocal, true)val conf = new SparkConf().setAppName(getAppName).setMaster(local4) conf.set(spark.hadoop.fs.oss.imp

21、l, com.aliyun.fs.oss.nat.NativeOssFileSystem) conf.set(spark.hadoop.job.runlocal, true)val sc = new SparkContext(conf)val data = sc.textFile(oss:/.) println(scount: $data.count()注意: 這個配置項spark.hadoop.job.runlocal只是針對需要在本地調試Spark代碼讀寫OSS數(shù)據(jù)的場景,除此之外只需要保持默認即可。三方依賴說明為了支持在E-MapReduce上操作阿里云的數(shù)據(jù)源(包括OSS,ODPS等)

22、,需要您的作業(yè)依賴一些三方包。 您可以參照這個POM文件來增刪需要依賴的三方包。垃圾清理Spark作業(yè)失敗后,需要檢查一下OSS輸出目 錄是否有文件存在。另外您也要 檢查OSS碎片管理中是否還有沒有提交的碎片存在,如果存在請及時清理掉。簡單操作OSS文件使用OSS SDK存在的問題在Spark或者Hadoop作業(yè)中無法直接使用OSS SDK來操作OSS中的文件。這是因為OSS SDK中依賴的http- 運行環(huán)境中的http-client存在版本沖突。如果要這么做,就必須先解決 這個依賴沖突問題。實際上在E-MapReduce中,Spark和Hadoop已經對OSS做了無縫兼容,可以像使用 HD

23、FS一樣來操作OSS文件。推薦做法Scalaimport org.apache.hadoop.conf.Configuration Scalaimport org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path, FileSystemval dir = oss:/accessKeyId:accessKeySecretbucket.endpoint/dir val path = new Path(dir)val conf = new Configuration()conf.set(fs.oss.impl, com

24、.aliyun.fs.oss.nat.NativeOssFileSystem)val fs = FileSystem.get(path.toUri, conf) val fileList = fs.listStatus(path).Javaimport org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem;String dir = oss:/acces

25、sKeyId:accessKeySecretbucket.endpoint/dir; Path path = new Path(dir);Configuration conf = new Configuration();conf.set(fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem);FileSystem fs = FileSystem.get(path.toUri(), conf); FileStatus fileList = fs.listStatus(path);.Spark開發(fā)快速入門Spark接入OSSval conf

26、= new SparkConf().setAppName(Test OSS) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) val conf = new SparkConf().setAppName(Test OSS) conf.set(spark.hadoop.fs.oss.impl, com.aliyun.fs.oss.nat.NativeOssFileSystem) val sc = new SparkContext(conf)val pathIn = oss:/accessKe

27、yId:accessKeySecretbucket.endpoint/path/to/read val inputData = sc.textFile(pathIn)val cnt = inputData.count println(scount: $cnt)val outputPath = oss:/accessKeyId:accessKeySecretbucket.endpoint/path/to/write val outpuData = inputData.map(e = s$e has been processed.) outpuData.saveAsTextFile(outputP

28、ath)附錄完整示例代碼請看:- Spark接入OSSSpark開發(fā)快速入門Spark接入ODPS在這一章中,您將學會如何使用E-MapReduce SDK在Spark中完成一次ODPS數(shù)據(jù)的讀寫操作。Step-1. 初始化一個OdpsOpsimport com.aliyun.odps.TableSchema import com.aliyun.odps.data.Recordimport com.aliyun.odps.TableSchema import com.aliyun.odps.data.Recordimport org.apache.spark.aliyun.odps.OdpsO

29、ps import org.apache.spark.SparkContext, SparkConfobject Sample def main(args: ArrayString): Unit = / = Step-1 =val accessKeyId = val accessKeySecret = / 以內網地址為例val urls = HYPERLINK /api Seq(/api, HYPERLINK / )val conf = new SparkConf().setAppName(Test Odps)val sc = new SparkContext(conf)val sc = ne

30、w SparkContext(conf)val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1)/ 下面是一些調用代碼/ = Step-2 =./ = Step-3 =./ = Step-2 =/ 方法定義1/ = Step-3 =/ 方法定義2Step-2. 從ODPS中加載表數(shù)據(jù)到Spark中/ = Step-2 =val project = val table = / = Step-2 =val project = val table = val numPartitions = 2valinputDa

31、ta=odpsOps.readTable(project,table,read,numPartitions) inputData.top(10).foreach(println)/ = Step-3 =.在上面的代碼中,您還需要定義一個read函數(shù),用來解析和預處理ODPS表數(shù)據(jù),如下:def read(record: Record, schema: TableSchema): String = record.getString(0)def read(record: Record, schema: TableSchema): String = record.getString(0)這個函數(shù)的含

32、義是,將ODPS表的第一列加載到Spark運行環(huán)境中。Step-3. 將Spark中的結果數(shù)據(jù)保存到ODPS表中val resultData = inputData.map(e = s$e has been processed.) odpsOps.saveToTable(project, table, dataRDD, write)通過odpsOps對象的saveToTable方法,可以將Spark RDD持久化到ODPS中。val resultData = inputData.map(e = s$e has been processed.) odpsOps.saveToTable(proje

33、ct, table, dataRDD, write)在上面的代碼中,您還需要定義一個write函數(shù),用作寫ODPS表前數(shù)據(jù)預處理,如下:defdefwrite(s:String,emptyReord:Record,schema:TableSchema):Unit= val r =emptyReordr.set(0, s)r.set(0, s)這個函數(shù)的含義是將RDD的每一行數(shù)據(jù)寫到對應ODPS表的第一列中。附錄完整示例代碼請看:- Spark接入ODPSSpark開發(fā)快速入門Spark接入ONSval Array(cId, topic, subExpression, parallelism,

34、interval) = args val accessKeyId = val accessKeySecret = val Array(cId, topic, subExpression, parallelism, interval) = args val accessKeyId = val accessKeySecret = val numStreams = parallelism.toIntval batchInterval = Milliseconds(interval.toInt)val conf = new SparkConf().setAppName(Test ONS Streami

35、ng) val ssc = new StreamingContext(conf, batchInterval)def func: Message = ArrayByte = msg = msg.getBody val onsStreams = (0 until numStreams).map i = println(sstarting stream $i)OnsUtils.createStream( ssc,cId, topic,subExpression, accessKeyId, accessKeySecret,StorageLevel.MEMORY_AND_DISK_2, func)va

36、l unionStreams = ssc.union(onsStreams) unionStreams.foreachRDD(rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).collect().foreach(e = println(sword: $e._1, cnt: $e._2)ssc.start() ssc.start() ssc.awaitTermination()附錄完整示例代碼請看:- Spark接入ONSSpar

37、k開發(fā)快速入門Spark接入Log Serviceif (args.length 6) System.err.println(if (args.length 6) System.err.println(Usage: TestLoghub | .stripMargin)System.exit(1)vallogserviceProject=args(0) /LogService中project名val logStoreName=args(1)/LogService中l(wèi)ogstore名val loghubGroupName = args(2) / loghubGroupName相同的作業(yè)將共同消費l

38、ogstore的數(shù)據(jù)val loghubEndpoint = args(3) / 阿里云日志服務數(shù)據(jù)類API Endpointval accessKeyId/AccessKeyIdval accessKeySecret/AccessKeySecret val numReceiversargs(4).toInt /Receiver來讀取logstore中的數(shù)據(jù)val batchInterval = Milliseconds(args(5).toInt * 1000) / Spark Streaming中每次處理批次時間間隔val conf = new SparkConf().setAppName

39、(Test Loghub Streaming) val ssc = new StreamingContext(conf, batchInterval)val loghubStream = LoghubUtils.createStream( ssc,loghubProject, logStream, loghubGroupName, endpoint, numReceivers, accessKeyId, accessKeySecret,StorageLevel.MEMORY_AND_DISK)loghubStream.foreachRDD(rdd = println(rdd.count()ss

40、c.start() ssc.start() ssc.awaitTermination()幾點說明E-MapReduce的機器(除了Master節(jié)點)無法連接公網。配置Log Service endpoint時,請注意使用LogService提供的內網endpoint,否則無法請求到LogService。了解更多關于LogService,請查看相關文檔。附錄完整示例代碼請看:Spark接入LogServiceSpark開發(fā)快速入門Spark接入MNSval conf = new SparkConf().setAppName(Test MNS Streaming) val batchInterva

41、l = Seconds(10)val ssc = new StreamingContext(conf, batchInterval)val conf = new SparkConf().setAppName(Test MNS Streaming) val batchInterval = Seconds(10)val ssc = new StreamingContext(conf, batchInterval)val queuename = queuename valaccessKeyId=val accessKeySecret = val endpoint = HYPERLINK http:/

42、xxx.yyy.zzzz/abc http:/xxx.yyy.zzzz/abcvalmnsStream=MnsUtils.createPullingStreamAsRawBytes(ssc,queuename,accessKeyId,accessKeySecret, endpoint,StorageLevel.MEMORY_ONLY) mnsStream.foreachRDD( rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).

43、collect().foreach(e = println(sword: $e._1, cnt: $e._2)ssc.start() ssc.awaitTermination()附錄完整示例代碼請看:Spark接入MNSSpark開發(fā)快速入門Spark接入Hbase下面這個例子演示了Spark如何向Hbase寫數(shù)據(jù)。需要指出的是,計算集群需要和Hbase集群處于一個安全組內object ConnectionUtil extends Serializable private val conf = HBaseConfiguration.create()conf.set(HConstants.ZOO

44、KEEPER_QUORUM,ecs1,ecs1,ecs3) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, /hbase) privateobject ConnectionUtil extends Serializable private val conf = HBaseConfiguration.create()conf.set(HConstants.ZOOKEEPER_QUORUM,ecs1,ecs1,ecs3) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, /hbase) privatevalconnect

45、ion=ConnectionFactory.createConnection(conf)def getDefaultConn: Connection = connection/ 創(chuàng) 建 數(shù) 據(jù) 流 unionStreams unionStreams.foreachRDD(rdd = rdd.map(bytes = new String(bytes).flatMap(line = line.split( ).map(word = (word, 1).reduceByKey(_ + _).mapPartitions words = val conn = ConnectionUtil.getDefa

46、ultConn val tableName = TableName.valueOf(tname) val t = conn.getTable(tableName)try words.sliding(100, 100).foreach(slice = val puts = slice.map(word = println(sword: $word)val put = new Put(Bytes.toBytes(word._1 + System.currentTimeMillis() put.addColumn(COLUMN_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES

47、,System.currentTimeMillis(), Bytes.toBytes(word._2) put).toList t.put(puts) finally t.close()Iterator.empty.count().count()ssc.start() ssc.awaitTermination()附錄完整示例代碼請看:Spark接入HbaseSpark Submit參數(shù)設置說明在E-MapReduce中,如果我們要運行一個Spark作業(yè),勢必要和它一堆的參數(shù)配置打交道。 那么我們該如何按照自己的集群配置情況來設置這些參數(shù)呢?下面我們就來介紹一下,如何在我們的E-MapReduc

48、e場景下設置spark-submit的參數(shù)首先我們假設我們有這樣的一個集群:E-MapReduce產品版本1.1.0Hadoop2.6.0Spark1.6.0Master節(jié)點8核16G500G1臺Worker節(jié)點 x 10臺8核16G500G- 10臺總資源:8核16G(Worker)x 10 + 8核16G(Master)(由于作業(yè)提交的時候資源只計算CPU和內存,所以這里磁盤的大小并未計算到總資源中。)Yarn可分配總資源:12核12.8G(worker)x 10(默認情況下,yarn可分配核=機器核x1.5,yarn可分配內存=機器內存x0.8)提交一個作業(yè)有了集群,現(xiàn)在我們要提交我們的

49、Spark作業(yè)了首先,我們在E-MapReduce中創(chuàng)建一個作業(yè),類似下面這個:-classorg.apache.spark.examples.SparkPi-classorg.apache.spark.examples.SparkPi-masteryarn-deploy-modeclient-driver-memory4g-num-executors 2-executor-memory2g-executor-cores2/opt/apps/spark-1.6.0-bin-hadoop2.6/lib/spark-examples*.jar10參數(shù)說明逐個分析一下:參數(shù)參考值說明classorg

50、.apache.spark.examples.S parkPi作業(yè)的主類masteryarn因為E-MapReduce使用的Yarn的模式,所以這里只能是yarn模式y(tǒng)arn-client等 同 于 -master yarn - deploy-mode client 此時不需要指定deploy-modeyarn-cluster等 同 于 -master yarn - deploy-mode cluster 此時不需要指定deploy-modedeploy-modeclientclient模式表示作業(yè)的AM會放在Master節(jié)點上運行。要注意的是,如果設置這個參數(shù),那么需要同時指定上面maste

51、r為yarn。clustercluster模式表示AM會隨機的在worker節(jié)點中的任意一臺上啟動運行。要注意的是,如果設置這個參數(shù),那么需要同時指定上面master為yarn。driver-memory4gdriver使用的內存,不可超過單機的core總數(shù)num-executors2創(chuàng)建多少個executorexecutor-memory2g各個executor使用的最大內存,不可超過單機的最大可使用內存executor-cores2各個executor使用的并發(fā)線程數(shù)目,也即每個executor最大可并發(fā)執(zhí)行的Task數(shù)目資源計算當我們在不同模式,不同的設置下運行的時候,作業(yè)使用的資源情況

52、如下表所示,yarn-client模式的資源計算節(jié)點資源類型資源量(結果使用上面的例子計算得到)mastercore1memdriver-memroy = 4gworkercorenum-executors * executor- cores = 4memnum-executors * executor- memory = 4g作業(yè)主程序(Driver程序)會在master節(jié)點上執(zhí)行。按照作業(yè)配置將分配4g(由-driver-memroy指 定)的內存給它(當然實際上可能沒有用到)。會在worker節(jié)點上起2個(由-num-executors指定)executor,每一個executor最大能

53、分配2g(由-executor-memory指定)的內存,并最大支持2個(由-executor-cores指定)task的并發(fā)執(zhí)行。yarn-cluster模式的資源計算節(jié)點資源類型資源量(結果使用上面的例子計算得到)master一個很小的client程序,負責同步job信息,占用很小。workercorenum-executors * executor- cores+spark.driver.cores = 5memnum-executors*executor- memory+driver-memroy= 8g注意:這里的spark.driver.cores默認是1,也可以設置為更多資源使用

54、的優(yōu)化yarn-client那么假設我們有了一個大作業(yè),使用yarn-client模式,想要多用一些這個集群的資源,我們要如何設置呢? 注意:Spark在分配內存時,會在用戶設定的內存值上溢出375M或7%(取大值)。Yarn分配container內存時,遵循向上取整的原則,這里也就是需要滿足1G的整數(shù)倍。-master yarn-client -driver-memory 5g -num-executors 20 -executor-memory 4g -executor-cores 4下面給出了一個配置值:-master yarn-client -driver-memory 5g -num

55、-executors 20 -executor-memory 4g -executor-cores 4按照我們上面的資源計算公式,mastercore:1mem:6g(5g+375m向上取整為6g)workerscore: 20*4 =80mem20*5g(4g+375m向上取整為5g100G可以看到總的資源沒有超過我們的集群的總資源-master yarn-client -driver-memory 5g -num-executors 40 -executor-memory 1g -executor-cores 2-master yarn-client -driver-memory 5g -

56、num-executors 15 -executor-memory 4g -executor-cores 4-master yarn-client -driver-memory 5g -num-executors 40 -executor-memory 1g -executor-cores 2-master yarn-client -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4-master yarn-client -driver-memory 5g -num-executors 10 -exe

57、cutor-memory 9g -executor-cores 6原則上,按照上面的公式計算出來的需要資源不超過集群的最大資源量就可以 但是在實際場景中,因為系統(tǒng),hdfs以及E-MapReduce的服務會需要使用core和mem資源,如果把core和mem都占用完了,反而會導致 性能的下降,甚至無法運行。 executor-cores數(shù)一般也都會設置的和集群的可使用核一致,因為如果設置的太多,cpu頻繁的切換性能并不會提高。yarn-cluster當使用yarn-cluster模式后,Driver程序會被放到worker節(jié)點上。資源會占用到worker的資源池子里面,這個 時候想要多用一些這

58、個集群的資源,我們要如何設置呢?-master yarn-cluster -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4下面給出了一個配置值:-master yarn-cluster -driver-memory 5g -num-executors 15 -executor-memory 4g -executor-cores 4一些配置建議如果將內存設置的很大,要注意gc所產生的消耗。一般我們會推薦一個executor的內存=64g如果是進行HDFS讀寫的作業(yè),建議是每個executor中使用=

59、5個并發(fā)來讀寫。如果是進行OSS讀寫的作業(yè),我們建議是將executor分布在不同的ECS上,這樣可以將每一個ECS的 帶寬都用上。比如有10臺ECS,那么就可以配置num-executors=10,并設置合理的內存和并發(fā)。如果作業(yè)中使用了非線程安全的代碼,那么在設置executor-cores的時候需要注意多并發(fā)是否會造成 作業(yè)的不正常。如果會,那么推薦就設置executor-cores=1Aliyun Spark SDK ReleaseNote說明與OSS數(shù)據(jù)源的交互,默認已經存在集群的運行環(huán)境中,用戶作業(yè) 將emr-core打進去,或者要保持和集群中的emr-core版本一致。emr-s

60、dk_2.10包:實現(xiàn)Spark與阿里云其他數(shù)據(jù)源的交互,例如LogService,MNS,ONS和ODPS等等。用戶作業(yè)打包時 必須 將emr-sdk_2.10打包進去,否則會出現(xiàn)相關類找不到的錯。com.aliyun.emremr-core1.1.2com.aliyun.emremr-sdk_v1.1.2解決作業(yè)慢讀寫OSS出現(xiàn)的ConnectionClosedException問題。解決OSS數(shù)據(jù)源時部分hadoop命令不可用問題。解決java.text.ParseException:Unparseabledate問題。優(yōu)化emr-core支持本地調試運行。兼容老版本的

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論