spark源碼解析 spark源碼解析_第1頁(yè)
spark源碼解析 spark源碼解析_第2頁(yè)
spark源碼解析 spark源碼解析_第3頁(yè)
spark源碼解析 spark源碼解析_第4頁(yè)
spark源碼解析 spark源碼解析_第5頁(yè)
已閱讀5頁(yè),還剩390頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、spark任務(wù)提交源碼入手小試牛刀spark由于實(shí)際工作當(dāng)中,都是將spark的任務(wù)提交到y(tǒng)arn集群上面去,所以我們安裝spark的環(huán)境只需要安裝一個(gè)任務(wù)提交客戶端即可第一步:下載安裝包并解壓node01下載spark3.0的安裝包 cd/kkb/softwget/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgztar-zxfspark-3.0.0-bin-hadoop3.2.tgz-C/kkb/install/第二步:修改配置文件node01執(zhí)行以下命令修改spark-env.sh配置文件 cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpspark-env.sh.templatespark-env.shvimspark-env.shexportJAVA_HOME=/kkb/install/jdk1.8.0_141exportHADOOP_HOME=/kkb/install/hadoop-3.1.4exportHADOOP_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopexportSPARK_CONF_DIR=/kkb/install/spark-3.0.0-bin-hadoop3.2/confexportYARN_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopnode01執(zhí)行以下命令修改slaves配置文件cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpslaves.templateslavesvimslaves#編輯文件內(nèi)容添為以下node01node02node03node01執(zhí)行以下命令修改spark-defaults.conf配置選項(xiàng)cd/kkb/install/spark-3.0.0-bin-hadoop3.2/confcpspark-defaults.conf.templatespark-defaults.confvimspark-defaults.confspark.eventLog.enabledspark.eventLog.dirspark.eventLpresstruehdfs://node01:8020/spark_logtrue第三步:安裝包的分發(fā)將node01機(jī)器的spark安裝包分發(fā)到其他機(jī)器上面去node01執(zhí)行以下命令進(jìn)行分發(fā) cd/kkb/install/scp-rspark-3.0.0-bin-hadoop3.2/node02:$PWDscp-rspark-3.0.0-bin-hadoop3.2/node03:$PWDnode01執(zhí)行以下命令啟動(dòng)spark集群 hdfsdfs-mkdir-p/spark_logcd/kkb/install/spark-3.0.0-bin-hadoop3.2sbin/start-all.shsbin/start-history-server.sh第五步:訪問瀏覽器管理界面直接瀏覽器訪問 http://node01:8080查看spark集群管理webUI界面。注意,如果8080端口沒法訪問,順延8081端口進(jìn)行訪問,如果8081端口也沒法訪問,繼續(xù)往后順延端口號(hào)http://node01:18080/訪問查看spark的historyserver地址2、spark運(yùn)行計(jì)算圓周率之任務(wù)提交過程spark集群安裝運(yùn)行成功之后,我們就可以運(yùn)行計(jì)算spark的任務(wù)了,例如我們可以提交一個(gè)spark的自帶案例來(lái)計(jì)算圓周率,其中spark的任務(wù)提交又有多種方式,例如local模式,standAlone模式或者yarn模式等等,其中我們實(shí)際工作當(dāng)中用的最多的就是yarn模式,以下是幾種提交運(yùn)行模式的介紹sparklocal local模式不用啟動(dòng)任何的spark的進(jìn)程,只需要解壓一個(gè)spark的安裝包就可以直接使用了,基于local模式的client提交運(yùn)行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modeclient--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10基于onyarn的cluster模式任務(wù)提交基于local模式的cluster提交運(yùn)行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modecluster--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10我們會(huì)看到,基于local模式的cluster提交方式直接報(bào)錯(cuò)2.2、spark任務(wù)提交的standAlone模式基于standAlone的任務(wù)提交,需要我們安裝搭建spark集群,并啟動(dòng)master以及worker進(jìn)程提交命令如下bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modeclient\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar10基于cluster任務(wù)的提交命令bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modecluster\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar103、spark任務(wù)提交的yarn模式并且將任務(wù)提交到y(tǒng)arn集群上面去node01執(zhí)行以下命令提交任務(wù)到y(tǒng)arn集群上面去運(yùn)行 bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarncluster模式代碼提交運(yùn)行架構(gòu)如下sparkonyarnclient模式運(yùn)行.drawiobin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modecluster\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarnclient模式任務(wù)提交過程如下spark-submitonyarncluster任務(wù)提交.drawio3、spark任務(wù)提交腳本分析mit 提交任務(wù)的過程是通過spark-submit這個(gè)腳本來(lái)進(jìn)行提交的,那我們就一起來(lái)看一下spark-submit這個(gè)腳本的內(nèi)容#!/usr/bin/envbash##LicensedtotheApacheSoftwareFoundation(ASF)underoneormore#contributorlicenseagreements.SeetheNOTICEfiledistributedwith#thisworkforadditionalinformationregardingcopyrightownership.#TheASFlicensesthisfiletoYouundertheApacheLicense,Version2.0#(the"License");youmaynotusethisfileexceptincompliancewith#theLicense.YoumayobtainacopyoftheLicenseat##/licenses/LICENSE-2.0##Unlessrequiredbyapplicablelaworagreedtoinwriting,software#distributedundertheLicenseisdistributedonan"ASIS"BASIS,#WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.#SeetheLicenseforthespecificlanguagegoverningpermissionsand#limitationsundertheLicense.#if[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home#disablerandomizedhashforstringinPython3.3+exportPYTHONHASHSEED=0exec"${SPARK_HOME}"/bin/spark-classorg.apache.spark.deploy.SparkSubmit"$@"查看發(fā)現(xiàn)spark-submit這個(gè)腳本里面執(zhí)行了spark-class這個(gè)腳本,然后帶了一個(gè)org.apache.spark.deploy.SparkSubmit這個(gè)參數(shù),其中使用$@來(lái)將我們傳入給spark-submit這個(gè)腳本的所有參數(shù)都傳遞過來(lái)了既然知道了執(zhí)行了spark-class腳本,后面帶上了org.apache.spark.deploy.SparkSubmit這個(gè)class類,那么我們就來(lái)看一下spark-class這腳本內(nèi)容 #!/usr/bin/envbashif[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home."${SPARK_HOME}"/bin/load-spark-env.sh#Findthejavabinaryif[-n"${JAVA_HOME}"];thenRUNNER="${JAVA_HOME}/bin/java"elseif["$(command-vjava)"];thenRUNNER="java"elseecho"JAVA_HOMEisnotset">&2exit1#FindSparkjars.if[-d"${SPARK_HOME}/jars"];thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"if[!-d"$SPARK_JARS_DIR"]&&[-z"$SPARK_TESTING$SPARK_SQL_TESTING"];thenecho"FailedtofindSparkjarsdirectory($SPARK_JARS_DIR)."1>&2echo"YouneedtobuildSparkwiththetarget\"package\"beforerunningthisprogram."1>&2exit1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"#Addthelauncherbuilddirtotheclasspathifrequested.if[-n"$SPARK_PREPEND_CLASSES"];thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"#Fortestsif[[-n"$SPARK_TESTING"]];thenunsetYARN_CONF_DIRunsetHADOOP_CONF_DIRbuild_command(){"$RUNNER"-Xmx128m$SPARK_LAUNCHER_OPTS-cp"$LAUNCH_CLASSPATH"org.apache.spark.launcher.Main"$@"printf"%d\0"$?}#Turnoffposixmodesinceitdoesnotallowprocesssubstitutionset+oposixCMD=()DELIM=$'\n'CMD_START_FLAG="false"whileIFS=read-d"$DELIM"-rARG;doif["$CMD_START_FLAG"=="true"];thenCMD+=("$ARG")elseif["$ARG"==$'\0'];then#AfterNULLcharacterisconsumed,changethedelimiterandconsumecommandstring.DELIM=''CMD_START_FLAG="true"elif["$ARG"!=""];thenecho"$ARG"done<<(build_command"$@")COUNT=${#CMD[@]}LAST=$((COUNT-1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}if![[$LAUNCHER_EXIT_CODE=~echo"${CMD[@]}"|head-n-1exit1^[0-9]+$]];then2if[$LAUNCHER_EXIT_CODE!=0];thenexit$LAUNCHER_EXIT_CODECMD=("${CMD[@]:0:$LAST}")exec"${CMD[@]}"最后執(zhí)行了$CMD這個(gè)命令,并且?guī)狭怂械膮?shù),那么CMD究竟是個(gè)什么東西,我們可以修改腳本給它打印出來(lái)看一下修改spark-class腳本,然后添加一行打印cd/kkb/install/spark-3.0.0-bin-hadoop3.2/vimbin/spark-class#在后面位置添加一行打印命令出來(lái)CMD=("${CMD[@]:0:$LAST}")echo"${CMD[@]}"exec"${CMD[@]}"重新提交任務(wù)cd/kkb/install/spark-3.0.0-bin-hadoop3.2/bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50下:[hadoop@node01spark-3.0.0-bin-hadoop3.2]$bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50/kkb/install/jdk1.8.0_141/bin/java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50觀察打印信息,主要就是執(zhí)行了一個(gè)命令java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50就是執(zhí)行了一個(gè)java的命令,通過org.apache.spark.deploy.SparkSubmit來(lái)進(jìn)行了任務(wù)提交,其實(shí)就是啟動(dòng)了一個(gè)jvm的虛擬機(jī)進(jìn)程來(lái)執(zhí)行了任務(wù)的提交,就是執(zhí)行了SparkSubmit的main方法,我們可以去查看源碼,找到SparkSubmit的main方法,驗(yàn)證啟動(dòng)的過程步驟4、SparkSubmit源碼分析前面我們通過腳本查看到了我們提交任務(wù)都是通過SparkSubmit這個(gè)類,那么我們就可以通過源碼當(dāng)中的SparkSubmit來(lái)查看這個(gè)類的main方法,main方法作為入口啟動(dòng)了一個(gè)java的進(jìn)程,通過IDEA的快捷鍵Ctrl+shift+alt+N來(lái)搜索SparkSubmit這類,然后知道Object當(dāng)中的main方法、查看main方法 /**/***在這里作為程序的入口類overridedefmain(args:Array[String]):Unit={valsubmit=newSparkSubmit(){self=>/***解析傳入的參數(shù)*@paramargs*@returnoverrideprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args){overrideprotecteddeflogInfo(msg:=>String):Unit=self.logInfo(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=self.logWarning(msg)overrideprotecteddeflogError(msg:=>String):Unit=self.logError(msg)}}overrideprotecteddeflogInfo(msg:=>String):Unit=printMessage(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=printMessage(s"Warning:$msg")overrideprotecteddeflogError(msg:=>String):Unit=printMessage(s"Error:$msg")overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}}/***最終的任務(wù)提交submit.doSubmit(args)}進(jìn)入到super.doSubmit方法Submit其實(shí)就是執(zhí)行了SparkSubmit內(nèi)部當(dāng)中的doSubmit方法 overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}3、查看父類當(dāng)中的doSubmit方法 查看doSubmit方法當(dāng)中的parseArguments方法defdoSubmit(args:Array[String]):Unit={//Initializeloggingifithasn'tbeendoneyet.Keeptrackofwhetherloggingneedsto//beresetbeforetheapplicationstarts./***初始化日志記錄valuninitLog=initializeLogIfNecessary(true,silent=true)/***解析參數(shù)所有的參數(shù)都解析出來(lái)封裝到一個(gè)對(duì)象里面去了叫做SparkSubmitArgumentsvalappArgs=parseArguments(args)if(appArgs.verbose){logInfo(appArgs.toString)}/***使用模式匹配來(lái)執(zhí)行任務(wù)提交的各種操作appArgs.actionmatch{caseSparkSubmitAction.SUBMIT=>submit(appArgs,uninitLog)caseSparkSubmitAction.KILL=>kill(appArgs)caseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)caseSparkSubmitAction.PRINT_VERSION=>printVersion()}}3.1、查看doSubmit方法當(dāng)中的parseArguments方法查看到parseArguments valappArgs=parseArguments(args)點(diǎn)擊parseArguments方法進(jìn)入到這個(gè)方法的具體實(shí)現(xiàn)如下protectedprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args)}可以看到該方法就是創(chuàng)建了SparkSubmitArguments這個(gè)對(duì)象,點(diǎn)擊進(jìn)入到這個(gè)對(duì)象當(dāng)中來(lái)3.2、查看SparkSubmitArguments當(dāng)中的parse方法查看方法解析SparkSubmitOptionParser這個(gè)java類當(dāng)中的parse方法 3.2.1、查看handle這個(gè)方法所在的具體實(shí)現(xiàn)handle這個(gè)方法的具體實(shí)現(xiàn)通過idea的快捷鍵ctrl+shift+h的方式,我們可以看到具體實(shí)現(xiàn)在SparkSubmitArguments當(dāng)中,我們直接去看SparkSubmitArguments當(dāng)中的實(shí)現(xiàn)方法SparkSubmitArguments當(dāng)中的handle方法如下:這個(gè)方法主要就是在解析各種參數(shù)3.3、查看SparkSubmitArguments當(dāng)中的loadEnvironmentArguments方法給action賦值 前面已經(jīng)查看了SparkSubmitArguments當(dāng)中的parse方法,該方法主要就是解析我們的參數(shù),然后給每個(gè)指定的參數(shù)進(jìn)行賦值了,下面還有一個(gè)方法叫做loadEnvironmentArguments這個(gè)方法主要就是給action進(jìn)行賦值的查看SparkSubmitArguments當(dāng)中的loadEnvironmentArguments()方法查看loadEnvironmentArguments方法3.4、action賦值成功之后在SparkSubmit當(dāng)中的doSubmit方法當(dāng)中提交任務(wù) 給action賦值成功之后,默認(rèn)值就是SUBMIT,那么在SparkSubmit當(dāng)中執(zhí)行doSubmit方法當(dāng)中,使用模式匹配來(lái)進(jìn)行任務(wù)提交3.5、查看SparkSubmit當(dāng)中的submit方法 找到了默認(rèn)就是任務(wù)提交之后,我們就可以直接去看submit方法的實(shí)現(xiàn)了,通過submit方法來(lái)提交任務(wù)了3.6、查看SparkSubmit當(dāng)中的runMain方法上面通過SparkSumit當(dāng)中的doRunMain方法,執(zhí)行到了runMain方法,查看runMain方法里面的任務(wù)具體提交過程 在上述的runMain方法當(dāng)中,執(zhí)行了一行代碼valval(childArgs,childClasspath,sparkConf,childMainClass)=prepareSubmitEnvironment(args)這一行代碼至關(guān)重要,創(chuàng)建了一個(gè)childMainClass這個(gè)屬性值,有了這個(gè)屬性值,才能繼續(xù)給下方的mainClass進(jìn)行賦值3.6.1、查看SparkSubmit的prepareSubmitEnvironment這個(gè)方法在prepareSubmitEnvironment這個(gè)方法當(dāng)中給childMainClass進(jìn)行了賦值 通過提交模式為yarn模式,將childMainclass賦值為了org.apache.spark.deploy.yarn.YarnClusterApplication//Inyarn-clustermode,useyarn.Clientasawrapperaroundtheuserclass/***判斷如果是yarn集群模式,那么給childMainClass賦值為org.apache.spark.deploy.yarn.YarnClusterApplicationif(isYarnCluster){childMainClass=YARN_CLUSTER_SUBMIT_CLASSif(args.isPython){childArgs+=("--primary-py-file",args.primaryResource)childArgs+=("--class","org.apache.spark.deploy.PythonRunner")}elseif(args.isR){valmainFile=newPath(args.primaryResource).getNamechildArgs+=("--primary-r-file",mainFile)childArgs+=("--class","org.apache.spark.deploy.RRunner")}else{if(args.primaryResource!=SparkLauncher.NO_RESOURCE){childArgs+=("--jar",args.primaryResource)}childArgs+=("--class",args.mainClass)}if(args.childArgs!=null){args.childArgs.foreach{arg=>childArgs+=("--arg",arg)}}}這樣就得到了childMainclass的最終結(jié)果值為org.apache.spark.deploy.yarn.YarnClusterApplication3.7、查看SparkSubmit當(dāng)中的runMain方法的app.start方法 上面通過childMainclass的最終結(jié)果值為org.apache.spark.deploy.yarn.YarnClusterApplication,然后執(zhí)行了app.start查看SparkSubmit當(dāng)中的runMain方法里面的app.start方法3.8、查看YarnClusterApplication當(dāng)中的start方法調(diào)用了start方法之后其實(shí)就是調(diào)用了org.apache.spark.deploy.yarn.YarnClusterApplication當(dāng)中的start方法通過IDE的快捷鍵ctrl+shift+alt+N在SparkSource工程當(dāng)中查找YarnClusterApplication這個(gè)類3.8.1、查看newClient創(chuàng)造Client對(duì)象創(chuàng)建了Client對(duì)象,在client對(duì)象的構(gòu)造器當(dāng)中,申明了一個(gè)yarnClient對(duì)象,這個(gè)對(duì)象的創(chuàng)建調(diào)用了YarnClient.createYarnClient這個(gè)方法3.8.2、查看YarnClient對(duì)象的創(chuàng)建查看YarnClient.createYarnClient這個(gè)方法,其實(shí)就是通過new創(chuàng)建了一個(gè)YarnClient對(duì)象 3.8.3、查看YarnClientImpl的對(duì)象創(chuàng)建通過newYarnClientImpl();來(lái)創(chuàng)建了YarnClientImpl這個(gè)對(duì)象,查看這個(gè)對(duì)象的創(chuàng)建初始化方法 這個(gè)方法又調(diào)用了super(YarnClientImpl.class.getName());這一行代碼3.9、查看Client當(dāng)中的run方法前面通過YarnClusterApplication當(dāng)中的start方法創(chuàng)建了Client對(duì)象,然后調(diào)用了run方法 查看run方法的具體實(shí)現(xiàn)類容如下:3.10、查看submitApplication方法的具體實(shí)現(xiàn)在Client當(dāng)中,通過submitApplication正式向ResourceManager提交了任務(wù) appContext來(lái)源于上面定義的一個(gè)變量3.10.1、查看Client當(dāng)中的appContext的內(nèi)容定義 在Client當(dāng)中執(zhí)行submitApplication方法來(lái)提交任務(wù)的時(shí)候,該方法當(dāng)中通過yarnClient.submitApplication(appContext)這一句代碼來(lái)實(shí)現(xiàn)了任務(wù)的提交,在提交任務(wù)的時(shí)候攜帶了一個(gè)參數(shù)叫做appContext查看createApplicationSubmissionContext方法3.10.1、查看Client當(dāng)中的containerContext前面已經(jīng)看過了appContext當(dāng)中主要就是定義解析一些參數(shù),接下來(lái)就繼續(xù)來(lái)查看 valvalcontainerContext=createContainerLaunchContext(newAppResponse)這一行代碼當(dāng)中的創(chuàng)建容器的方法,查看createContainerLaunchContext方法實(shí)現(xiàn)如下4、查看ApplicationMaster的啟動(dòng)流程 前面找到了我們通過java命令行的方式啟動(dòng)了一個(gè)java進(jìn)程,叫做ApplicationMaster,這個(gè)ApplicationMaster想要啟動(dòng)執(zhí)行,肯定也是有一個(gè)main方法的執(zhí)行入口,通過main方法的執(zhí)行入口來(lái)執(zhí)行程序的啟動(dòng),使用idea的快捷鍵ctrl+alt+shift+N打開ApplicationMaster,然后找到Object伴生對(duì)象當(dāng)中的main方法4.1、查看ApplicationMaster類當(dāng)中的main方法下面的ApplicationMasterArguments類的初始化在ApplicationMaster類當(dāng)中的main下面執(zhí)行了一行代碼 valvalamArgs=newApplicationMasterArguments(args),這一句代碼創(chuàng)建了一個(gè)ApplicationMasterArguments這個(gè)對(duì)象,這個(gè)對(duì)象專門用于解析前面?zhèn)魅脒^來(lái)的參數(shù),查看ApplicationMasterArguments類的初始化4.2、查看ApplicationMaster在main方法當(dāng)中創(chuàng)建的對(duì)象ApplicationMaster在ApplicationMaster這個(gè)伴生對(duì)象的main方法當(dāng)中,還執(zhí)行了一句代碼 master=newApplicationMaster(amArgs,sparkConf,yarnConf)這一句代碼主要就是創(chuàng)建了ApplicationMaster這個(gè)對(duì)象,這個(gè)對(duì)象的初始化創(chuàng)建過程如下在創(chuàng)建ApplicationMaster這個(gè)對(duì)象的時(shí)候,可以看到在里面初始化了一個(gè)YarnRMClient這個(gè)對(duì)象,這個(gè)對(duì)象其實(shí)就是一個(gè)中間的橋梁,用于連接ApplicationMaster與ResourceManager。4.3、查看ApplicationMaster中的main方法里面執(zhí)行的master.run方法在ApplicationMaster當(dāng)中的main方法里面,執(zhí)行了一行代碼如下:ugiugi.doAs(newPrivilegedExceptionAction[Unit](){overridedefrun():Unit=System.exit(master.run())這一行代碼當(dāng)中執(zhí)行了master.run,其實(shí)就是開始運(yùn)行ApplicationMaster了,我們可以查看run方法的具體實(shí)現(xiàn)內(nèi)容如下4.3.1、查看run方法當(dāng)中的runDriver方法的具體實(shí)現(xiàn)在ApplicationMaster的run方法當(dāng)中,執(zhí)行了一段代碼如下 if(isClusterMode){runDriver()}else{runExecutorLauncher()}判斷如果是集群模式,那么就執(zhí)行了runDriver這個(gè)方法,通過runDriver這個(gè)方法,啟動(dòng)了driver程序判斷如果是集群模式,那么就執(zhí)行了runDriver這個(gè)方法,通過runDriver這個(gè)方法,啟動(dòng)了driver程序runDriver當(dāng)中主要運(yùn)行了兩條任務(wù)線第一條是跟資源相關(guān)的資源申請(qǐng)線第二條是跟用戶程序執(zhí)行相關(guān)的程序執(zhí)行線那么我們可以去看一下runDrvier方法的具體實(shí)現(xiàn)內(nèi)容如下在runDriver方法當(dāng)中,主要執(zhí)行了啟動(dòng)用戶的程序,等待sparkContext對(duì)象的創(chuàng)建,注冊(cè)ApplicationMaster以及分配container等多個(gè)動(dòng)作。其中runDriver主要分為兩條線往下運(yùn)行第一條線:?jiǎn)?dòng)集群環(huán)境,申請(qǐng)運(yùn)行資源相關(guān)的第二條線:resumeDriver保證用戶代碼繼續(xù)往下執(zhí)行4.3.2、查看runDriver方法當(dāng)中的startUserApplication方法的具體實(shí)現(xiàn)在runDriver方法當(dāng)中的具體實(shí)現(xiàn)里面,執(zhí)行了一句代碼 userClassThread=startUserApplication()通過這一行代碼啟動(dòng)了用戶的應(yīng)用程序,其中startUserApplication方法的具體實(shí)現(xiàn)如下4.3.3、查看runDriver方法當(dāng)中的createAllocator方法具體實(shí)現(xiàn)在ApplicationMaster的runDriver方法當(dāng)中,執(zhí)行了一行代碼如下 createAllocatorcreateAllocator(driverRef,userConf,rpcEnv,appAttemptId,distCacheConf)這一行代碼主要就是在進(jìn)行container的分配,該方法的具體實(shí)現(xiàn)如下:4.3.4、繼續(xù)查看createAllocator方法當(dāng)中的allocateResources方法內(nèi)部實(shí)現(xiàn)在上面ApplicationMaster當(dāng)中調(diào)用createAllocator方法當(dāng)中,執(zhí)行了一行代碼如下 allocatorallocator.allocateResources()在allocateResources方法的具體實(shí)現(xiàn)如下4.3.5、查看allocateResources方法內(nèi)部的handleAllocatedContainers方法的具體實(shí)現(xiàn)在YarnAllocator當(dāng)中,我們看到在執(zhí)行allocateResources的時(shí)候,我們通過 valallocatedContainers=allocateResponse.getAllocatedContainers()這一行代碼獲取到了所有的可分配的container,然后通過下一行代碼if(allocatedContainers.size>0){logDebug(("Allocatedcontainers:%d.Currentexecutorcount:%d."+"Launchingexecutorcount:%d.Clusterresources:%s.")allocatedContainers.size,sizenumExecutorsStarting.get,allocateResponse.getAvailableResources))handleAllocatedContainers(allocatedContainers.asScala)}使用了handleAllocatedContainers(allocatedContainers.asScala)來(lái)進(jìn)行容器container的分配,handleAllocatedContainers的方法的具體實(shí)現(xiàn)如下4.3.5、查看handleAllocatedContainers方法當(dāng)中的runAllocatedContainers方法內(nèi)部具體實(shí)現(xiàn)在YarnAllocator當(dāng)中執(zhí)行handleAllocatedContainers方法時(shí),執(zhí)行了一行代碼如下 runAllocatedContainers(containersToUse)這一行代碼主要就是在進(jìn)行運(yùn)行分配好了的container了,這個(gè)runAllocatedContainers方法的具體實(shí)現(xiàn)內(nèi)容如下該方法的具體實(shí)現(xiàn)如下4.3.6、查看線程的執(zhí)行的run方法 在YarnAllocator當(dāng)中執(zhí)行runAllocatedContainers方法的時(shí)候,使用了線程池launcherPool的方式進(jìn)行執(zhí)行,最后調(diào)用run方法代碼如下newExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID//useuntilfullysupported).run()run方法的具體內(nèi)容如下4.3.7、查看startContainer方法的具體實(shí)現(xiàn) 在上面ExecutorRunnable當(dāng)中,執(zhí)行run方法的時(shí)候,在run方法當(dāng)中初始化了NodeManagerClient這個(gè)對(duì)象,這個(gè)對(duì)象主要是是用于與NodeManager進(jìn)行通信,通過客戶端與NodeManager服務(wù)端進(jìn)行通信,然后通過調(diào)用startContainer方法在NodeManager上面創(chuàng)建Exector進(jìn)程,在run方法內(nèi)部有一行代碼執(zhí)行如下startContainer()4.3.8、查看在ExecutorRunnable當(dāng)中運(yùn)行startContainer方法調(diào)用prepareCommand方法的過程在ExecturoRunnable當(dāng)中執(zhí)行startContainer方法的時(shí)候,調(diào)用了一行代碼valvalcommands=prepareCommand()這一行代碼主要就是在準(zhǔn)備java的一個(gè)命令,通過java-server的方式來(lái)啟動(dòng)另外一個(gè)java進(jìn)程,查看prepareCommand方法的具體實(shí)現(xiàn)如下ecutor 前面看到了我們通過java命令啟動(dòng)了一個(gè)Executor的進(jìn)程,那么就是執(zhí)行了org.apache.spark.executor.YarnCoarseGrainedExecutorBackend這個(gè)類當(dāng)中的main方法,啟動(dòng)了java的一個(gè)進(jìn)程,那么我們就可以直接去找這個(gè)類的main方法去查看這個(gè)進(jìn)程的啟動(dòng)過程,使用idea的ctrl+alt+shift+N快捷鍵來(lái)查找YarnCoarseGrainedExecutorBackend這個(gè)類,并找到它的main方法作為程序的入口類5.1、查看YarnCoarseGrainedExecutorBackend這個(gè)Object當(dāng)中main方法的run方法執(zhí)行在YarnCoarseGrainedExecutorBackend當(dāng)中執(zhí)行了main方法,在main方法里面執(zhí)行了 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend.run(backendArgs,createFn)這樣一行代碼,這樣一行代碼當(dāng)中執(zhí)行了run方法,查看run方法的具體實(shí)現(xiàn)如下5.2、查看run方法當(dāng)中的setupEndPoint方法的具體實(shí)現(xiàn) 在上面CoarseGrainedExecutorBackend這個(gè)class類當(dāng)中在執(zhí)行run方法的時(shí)候,run方法當(dāng)中有一行代碼如下env.rpcEnv.setupEndpoint("Executor",backendCreateFn(env.rpcEnv,arguments,env,cfg.resourceProfile))其中通過sparkEnv對(duì)象,調(diào)用了rpcEnv這個(gè)屬性,這個(gè)對(duì)象來(lái)調(diào)用了setupEndPoint這個(gè)方法,這個(gè)方法的具體實(shí)現(xiàn)代碼如下,通過idea的快捷鍵ctrl+shift+h快捷鍵可以看到該方法的具體實(shí)現(xiàn)內(nèi)容如下通過消息轉(zhuǎn)發(fā)器dispatcher來(lái)調(diào)用了registerRpcEndpoint5.3、查看setupEndpoint方法當(dāng)中的registerRpcEndpoint方法的具體實(shí)現(xiàn)在上面NettyRpcEnv當(dāng)中調(diào)用了setupEndpoint方法的時(shí)候,執(zhí)行了一行代碼如下 dispatcher.registerRpcEndpoint(name,endpoint)通過消息轉(zhuǎn)發(fā)器dispatcher來(lái)注冊(cè)了Rpc的Endpoint這個(gè)終端,registerRpcEndpoint方法的具體實(shí)現(xiàn)內(nèi)容如下5.4、查看registerRpcEndpoint方法當(dāng)中的DedicatedMessageLoop對(duì)象創(chuàng)建過程在registerRpcEndpoint方法當(dāng)中,執(zhí)行了一段代碼如下varmessageLoop:MessageLoop=nulltry{messageLoop=endpointmatch{casee:IsolatedRpcEndpoint=>newDedicatedMessageLoop(name,e,this)case_=>sharedLoop.register(name,endpoint)sharedLoop}endpoints.put(name,messageLoop)}catch{caseNonFatal(e)=>moveendpointthrowe}在這一行代碼當(dāng)中,創(chuàng)建了一個(gè)對(duì)象DedicatedMessageLoop,這個(gè)對(duì)象當(dāng)中創(chuàng)建了inbox收件箱對(duì)象,主要用于收取消息,對(duì)象的創(chuàng)建過程如下在上面DedicatedMessageLoop對(duì)象創(chuàng)建的時(shí)候,我們會(huì)發(fā)現(xiàn)調(diào)用了一行代碼如下: privateprivatevalinbox=newInbox(name,endpoint)這里其實(shí)就是創(chuàng)建了一個(gè)Inbox對(duì)象,通過Inbox這個(gè)對(duì)象來(lái)實(shí)現(xiàn)數(shù)據(jù)的接受,Inbox的具體實(shí)現(xiàn)內(nèi)容如下在inbox這個(gè)對(duì)象初始化的時(shí)候調(diào)用了//OnStartshouldbethefirstmessagetoprocessinbox.synchronized{messages.add(OnStart)}其實(shí)就是通過LinkedList給InboxMessage添加了一個(gè)onStart的事件進(jìn)去了,這就涉及到通信的生命周期,在spark框架當(dāng)中,通信的總的接口定義在了RpcEndpoint這個(gè)trait當(dāng)中。在RpcEndpoint這個(gè)trait當(dāng)中定義了一系列的生命周期的順序,在RpcEndpoint當(dāng)中是這樣定義注釋的其中CoarseGrainedExecutorBackend這個(gè)類也是RpcEndPoint的子類調(diào)用onStart之后,執(zhí)行CoarseGrainedExecutorBackend的onStart()方法邏輯如下5.6、查看CoarseGrainedExecutorBackend類的onStart方法 前面在看到了通過inbox調(diào)用了onStart事件,然后inBox是DedicatedMessageLoop當(dāng)中定義的對(duì)象,DedicatedMessageLoop又是YarnCoarseGrainedExecutorBackend當(dāng)中定義的對(duì)象,而YarnCoarseGrainedExecutorBackend又是CoarseGrainedExecutorBackend的子類,所以這里通過inbox調(diào)用了onStart之后,就會(huì)執(zhí)行CoarseGrainedExecutorBackend當(dāng)中的onStart方法,onStart的方法的具體定義如下在上面onStart的方法內(nèi)容定義如下overridedefonStart():Unit={logInfo("Connectingtodriver:"+driverUrl)try{_resources=parseOrFindResources(resourcesFileOpt)}catch{caseNonFatal(e)=>exitExecutor(1,"Unabletocreateexecutordueto"+e.getMessage,e)}rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap{ref=>//Thisisaveryfastactionsowecanuse"ThreadUtils.sameThread"driver=Some(ref)//向Driver當(dāng)中注冊(cè)Executor通過RegisterExecutor樣例類的方式來(lái)包裝了發(fā)送的消息出去ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,fileid}(ThreadUtils.sameThread).onComplete{caseSuccess(_)=>self.send(RegisteredExecutor)caseFailure(e)=>exitExecutor(1,s"Cannotregisterwithdriver:$driverUrl",e,notifyDriver=false)}在這個(gè)方法當(dāng)中調(diào)用了RegisterExecutor,通過RegisterExecutor來(lái)封裝了樣例類,所以這里發(fā)送消息,一定會(huì)有個(gè)地方能接收到消息ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,通過RegisterExecutor其實(shí)就是向Driver端進(jìn)行通信,向Driver端進(jìn)行通信注冊(cè)Executor,方便Driver端后續(xù)做DAG的劃分以及task的分解,將分解之后的task運(yùn)行在Executor上面,既然是向Driver端進(jìn)行注到Driver端的響應(yīng)以及回復(fù)5.7、簡(jiǎn)單查看Driver端的消息接收以及回復(fù)過程sparkContext對(duì)象當(dāng)中有一個(gè)屬性叫做SchedulerBackend SchedulerBackend其實(shí)就是我們的通信后臺(tái),SchedulerBackend是一個(gè)trait,可以找到他的實(shí)現(xiàn)類為CoarseGrainedSchedulerBackend5.8、查看CoarseGrainedSchedulerBackend當(dāng)中消息接收以及回復(fù)receiveAndReply方法 我們通過SchedulerBackend找到他的實(shí)現(xiàn)類為CoarseGrainedSchedulerBackend,在這會(huì)進(jìn)行通信,里面有一個(gè)發(fā)方法叫做receiveAndReply這個(gè)方法,專門用于接收消息,并進(jìn)行回復(fù)的,該方法的定義內(nèi)容如下其中CoarseGrainedExecutorBackend與CoarseGrainedSchedulerBackend的請(qǐng)求以及響應(yīng)關(guān)系如下圖CoarseGrainedExecutorBackend與CoarseGrainedSchedulerBackend通信流程.drawio至此,通過以上的通信環(huán)境,我們整個(gè)的運(yùn)行環(huán)境全部創(chuàng)建成功,有了driver有了executor,有了resourceManager,有了nodeManager等,就可以進(jìn)行任務(wù)的計(jì)算了,至此,在啟動(dòng)ApplicationMaster的時(shí)候,第一條線路當(dāng)中的資源環(huán)境問題就已經(jīng)正式啟動(dòng)成功。剩下的就是第二條線,運(yùn)行用戶代碼的問題了。4、spark任務(wù)組件之間的通信源碼深入剖析1、通信基本概念介紹NIO,AIO等這幾種通信模型的話,那么我們需要先了解阻塞和非阻塞,同步和非同步的概念阻塞和非阻塞是指進(jìn)程在訪問數(shù)據(jù)的時(shí)候,數(shù)據(jù)內(nèi)部是否準(zhǔn)備就緒的一種處理方式。當(dāng)數(shù)據(jù)沒有準(zhǔn)備的時(shí)候阻塞:需要等待緩沖區(qū)的數(shù)據(jù)準(zhǔn)備好才去處理之后的事情,否則一直等待下去非阻塞:無(wú)論緩沖區(qū)的數(shù)據(jù)是否準(zhǔn)備好,都立刻返回同步和異步都是基于應(yīng)用程序和操作系統(tǒng)處理IO時(shí)間鎖采用的方式。同步:應(yīng)用程序要直接參與IO讀寫的操作,在處理IO事件的時(shí)候必須阻塞在某個(gè)方法上的等待我們IO完成的時(shí)間(阻塞IO事件或者通過輪詢IO事件的方式)。阻塞直到IO事件遇到write或者read,這個(gè)時(shí)候我們不能做任何我們想去做的事情,讓讀寫方法加入到線程中,通過阻塞線程來(lái)實(shí)現(xiàn),這樣對(duì)線程的性大。 異步:所有的IO讀寫都交給操作系統(tǒng)處理,此時(shí)應(yīng)用程序可以處理其他事情,當(dāng)操作系統(tǒng)完成IO后給應(yīng)用程序一個(gè)通知即可。3、socket通信介紹socket: Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。在設(shè)計(jì)模式中,Socket其實(shí)就是一個(gè)門面模式,它把復(fù)雜的TCP/IP協(xié)議族隱藏在Socket接口后面,對(duì)用戶來(lái)說,一組簡(jiǎn)單的接口就是全部,讓Socket去組織數(shù)據(jù),以符合指定的協(xié)議先從服務(wù)器端說起。服務(wù)器端先初始化Socket,然后與端口綁定(bind),對(duì)端口進(jìn)行監(jiān)聽(listen),調(diào)用accept阻塞,等待客戶端連接。在這時(shí)如果有個(gè)客戶端初始化一個(gè)Socket,然后連接服務(wù)器(connect),如果連接成功,這時(shí)客戶端與服務(wù)器端的連接就建立了??蛻舳税l(fā)送數(shù)據(jù)請(qǐng)求,服務(wù)器端接收請(qǐng)求并處理請(qǐng)求,然后把回應(yīng)數(shù)據(jù)發(fā)送給客戶端,客戶端讀取數(shù)據(jù),最后關(guān)閉連接,一次交互結(jié)束優(yōu)點(diǎn)1)傳輸數(shù)據(jù)為字節(jié)級(jí),傳輸數(shù)據(jù)可自定義,數(shù)據(jù)量小(對(duì)于手機(jī)應(yīng)用講:費(fèi)用低)2)傳輸數(shù)據(jù)時(shí)間短,性能高3)適合于客戶端和服務(wù)器端之間信息實(shí)時(shí)交互4)可以加密,數(shù)據(jù)安全性強(qiáng)1)需對(duì)傳輸?shù)臄?shù)據(jù)進(jìn)行解析,轉(zhuǎn)化成應(yīng)用級(jí)的數(shù)據(jù)2)對(duì)開發(fā)人員的開發(fā)水平要求高3)相對(duì)于Http協(xié)議傳輸,增加了開發(fā)量4)效率低下同步阻塞I/O模式,全稱BlockingIO,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成 它是基于流模型實(shí)現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時(shí),在讀寫動(dòng)作完成之前,線程會(huì)一直阻塞在那里,它們之間的調(diào)用時(shí)可靠的線性順序。它的優(yōu)點(diǎn)就是代碼比較簡(jiǎn)單、直觀;缺點(diǎn)就是IO的效率和擴(kuò)展性很低,容易成為應(yīng)用性能瓶頸。主要最大的缺點(diǎn)就是會(huì)對(duì)我們的操作進(jìn)行阻塞,例如我們?nèi)メt(yī)院看醫(yī)生,需要排隊(duì)取號(hào),如果中途過號(hào)了,那么對(duì)不起,過號(hào)不厚,要么退號(hào),要么重新掛號(hào),這就限制了我們?nèi)×颂?hào)之后非得要在醫(yī)院里面等著啥也不能干,盡管你不知道你得要等多久,所以這種阻塞式的IO非常不友好,會(huì)浪費(fèi)我們大量的時(shí)間采用BIO通信模型的服務(wù)端,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽客戶端的連接,接收到客戶端連接之后為客戶端連接創(chuàng)建一個(gè)新的線程處理請(qǐng)求消息,處理完成之后,返回應(yīng)答消息給客戶端,線程銷。該架構(gòu)最大的問題就是不具備彈性伸縮能力,當(dāng)并發(fā)訪問量增加后,服務(wù)端的線程個(gè)數(shù)和并發(fā)訪問數(shù)成線性正比,由于線程是JAVA虛擬機(jī)非常寶貴的系統(tǒng)資源,當(dāng)線程數(shù)膨脹之后,系統(tǒng)的性能急劇下降,隨著并發(fā)量的繼續(xù)增加,可能會(huì)發(fā)生句柄溢出、線程堆棧溢出等問題,并導(dǎo)致服務(wù)器最終宕機(jī)。 NIO:一種非阻塞式通信模式,線程在執(zhí)行這個(gè)通信業(yè)務(wù)過程中,如果有一個(gè)環(huán)節(jié)沒有準(zhǔn)備好,那么線程可以去執(zhí)行其他任務(wù),線程占用的情況大幅度釋放。例如我們?nèi)ワ埖瓿燥?,以前沒有外賣的時(shí)候,我們?nèi)ワ埖瓿燥埖靡抨?duì)等著,等著廚師把我們的飯菜做好了,然后我們才能開吃,這種模型就類似于BIO,阻塞式IO,效率極其低下,為了節(jié)約等待的時(shí)間,等待的時(shí)候我們可以去打球,每隔一會(huì)兒回來(lái)問一下老板我們的飯菜有沒有做好 傳統(tǒng)的IO操作面向數(shù)據(jù)流,意味著每次從流中讀一個(gè)或多個(gè)字節(jié),直至完成,數(shù)據(jù)沒有被緩存在任何地方。NIO操作面向緩沖區(qū),數(shù)據(jù)從Channel讀取到Buffer緩沖區(qū),隨后在Buffer中處理數(shù)據(jù)。利用Buffer讀寫數(shù)據(jù),通常遵循四個(gè)步驟:1.把數(shù)據(jù)寫入buffer;2.調(diào)用flip;3.從Buffer中讀取數(shù)據(jù);4.調(diào)用buffer.clear()當(dāng)寫入數(shù)據(jù)到buffer中時(shí),buffer會(huì)記錄已經(jīng)寫入的數(shù)據(jù)大小。當(dāng)需要讀數(shù)據(jù)時(shí),通過flip()方法把buffer從寫模式調(diào)整為讀模式;在讀模式下,可以讀取所有已經(jīng)寫入的數(shù)據(jù)。2、channel通道javaNIOChannel通道和流非常相似,主要有以下幾點(diǎn)區(qū)別:1.通道可以讀也可以寫,流一般來(lái)說是單向的(只能讀或者寫)。2.通道可以異步讀寫。3.通道總是基于緩沖區(qū)Buffer來(lái)讀寫。正如上面提到的,我們可以從通道中讀取數(shù)據(jù),寫入到buffer;也可以中buffer內(nèi)讀數(shù)據(jù),寫入到通道有個(gè)示意圖:Channel有:1.FileChannel2.DatagramChannel3.SocketChannel4.ServerSocketChannelTCP的數(shù)據(jù)讀寫。ServerSocketChannel允許我們監(jiān)聽TCP鏈接請(qǐng)求,每個(gè)請(qǐng)求會(huì)創(chuàng)建會(huì)一個(gè)SocketChannel。3、selector選擇器如此可以實(shí)現(xiàn)單線程管理多個(gè)channels,也就是可以管理多個(gè)網(wǎng)絡(luò)鏈接。通過上面的了解我們知道Selector是一種IOmultiplexing的情況。下面這幅圖描述了單線程處理三個(gè)channel的情況:1、多個(gè)Client同時(shí)注冊(cè)到多路復(fù)用器selector上;2、selector遍歷所有注冊(cè)的通道;3、查看通道狀態(tài)(包括Connect、Accept、Read、Write);4、根據(jù)狀態(tài)執(zhí)行相應(yīng)狀態(tài)的操作; 1.由一個(gè)專門的線程來(lái)處理所有的IO事件,并負(fù)責(zé)分發(fā)。2.事件驅(qū)動(dòng)機(jī)制:事件到的時(shí)候觸發(fā),而不是同步的去監(jiān)視事件。3.線程通訊:線程之間通過wait,notify等方式通訊。保證每次上下文切換都是有意義的。減少無(wú)謂6、AIO通信模型介紹(異步非阻塞式IO) JDK1.7升級(jí)了NIO類庫(kù),升級(jí)后的NIO類庫(kù)被稱為NIO2.0。也就是我們要介紹的AIO。NIO2.0引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)。還是以吃飯為例,我們有了外賣之后,我們就徹底與老板進(jìn)行解耦了,我們下單老板做飯,然后外賣員給我們送過來(lái),在這等待老板做飯的時(shí)間,我們可以繼續(xù)去做別的事情,吃飯等待的時(shí)間也被我們利用起來(lái)了,更加提高了效率用戶程序可以通過向內(nèi)核發(fā)出I/O請(qǐng)求命令,不用等帶I/O事件真正發(fā)生,可以繼續(xù)做另外的事情,等I/O操作完成,內(nèi)核會(huì)通過函數(shù)回調(diào)或者信號(hào)機(jī)制通知用戶進(jìn)程。這樣很大程度提高了系統(tǒng)吞吐量。異步通道提供兩種方式獲取操作結(jié)果。(1)通過Java.util.concurrent.Future類來(lái)表示異步操作的結(jié)果;(2)在執(zhí)行異步操作的時(shí)候傳入一個(gè)Java.nio.channels.CompletionHandler接口的實(shí)現(xiàn)類作為操作完成的回調(diào)。NIO2.0的異步套接字通道是真正的異步非阻塞IO,它對(duì)應(yīng)UNIX網(wǎng)絡(luò)編程中的事件驅(qū)動(dòng)IO(AIO),它不需要通過多路復(fù)用器(Selector)對(duì)注冊(cè)的通道進(jìn)行輪詢操作即可實(shí)現(xiàn)異步讀寫,從而簡(jiǎn)化了NIO的編程模我們可以得出結(jié)論:異步SocketChannel是被動(dòng)執(zhí)行對(duì)象,我們不需要想NIO編程那樣創(chuàng)建一個(gè)獨(dú)立的IO線程來(lái)處理讀寫操作。對(duì)于AsynchronousServerSocketChannel和AsynchronousSocketChannel,它們都由JDK底層的線程池負(fù)責(zé)回調(diào)并驅(qū)動(dòng)讀寫操作。正因?yàn)槿绱?,基于NIO2.0新的異步非阻塞Channel進(jìn)行編程比NIO編程更為簡(jiǎn)單。信模型的對(duì)比 由上述總結(jié)得出,并不意味著所有的Java網(wǎng)絡(luò)編程都必須要選擇NIO和Netty,具體選擇什么樣的IO模型或者NIO框架,完全基于業(yè)務(wù)的實(shí)際應(yīng)用場(chǎng)景和性能訴求,如果客戶端并發(fā)連接數(shù)不多,周邊對(duì)接的網(wǎng)元不多,服務(wù)器的負(fù)載也不重,那就完全沒必要選擇NIO做服務(wù)端;如果是相反情況,那就考慮選擇合NIO行開發(fā)。定義bio服務(wù)端

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論