Java多線程編程基石ThreadPoolExecutor示例詳解_第1頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第2頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第3頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第4頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第5頁
已閱讀5頁,還剩14頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第Java多線程編程基石ThreadPoolExecutor示例詳解多線程編程是現(xiàn)代軟件開發(fā)中不可或缺的一部分,但是手動管理線程可能會變得非常復(fù)雜,因為需要考慮許多并發(fā)問題,例如線程安全和資源競爭。為了避免這些問題,Java提供了ThreadPoolExecutor類,它是一種高度優(yōu)化的多線程執(zhí)行器,可以管理線程池、執(zhí)行線程任務(wù)和控制線程池的大小和生命周期等

為什么用線程池

線程創(chuàng)建和銷毀的開銷較大,每個線程都需要占用一定的內(nèi)存和系統(tǒng)資源。如果頻繁地創(chuàng)建和銷毀線程,會導(dǎo)致系統(tǒng)的性能下降。

手動管理線程容易出現(xiàn)線程安全和資源競爭的問題,例如,多個線程同時訪問共享變量可能導(dǎo)致數(shù)據(jù)不一致或者死鎖等問題。

如果并發(fā)訪問的線程數(shù)量很大,可能會導(dǎo)致系統(tǒng)資源不足,例如,內(nèi)存不足或者CPU過度使用等問題。

corePoolSize:核心線程池大小,即線程池中始終存在的線程數(shù)量,除非設(shè)置了allowCoreThreadTimeOut參數(shù),默認情況下,即使空閑,核心線程也不會被回收。

maximumPoolSize:線程池的最大線程數(shù),即可以同時執(zhí)行的最大線程數(shù)量。

keepAliveTime:非核心線程的空閑存活時間,當非核心線程空閑時間超過這個時間,就會被回收。

unit:keepAliveTime的時間單位。

workQueue:任務(wù)隊列,用于存儲等待執(zhí)行的任務(wù),有多種實現(xiàn)方式,例如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。

threadFactory:用于創(chuàng)建新線程的工廠類,可以自定義線程名稱、線程優(yōu)先級等屬性。

handler:線程池的拒絕策略,當線程池已經(jīng)達到最大線程數(shù),并且任務(wù)隊列已經(jīng)滿了,新的任務(wù)將被拒絕執(zhí)行,可以設(shè)置拒絕策略來處理這種情況。

核心線程數(shù)和最大線程數(shù)設(shè)置

CPU密集型任務(wù):CPU密集型任務(wù)的特點是線程在執(zhí)行任務(wù)時會一直利用CPU,對于這種情況要盡可能的避免發(fā)生線程上下文的切換。一般來說對于CPU密集型任務(wù)設(shè)置線程數(shù)為CPU核心數(shù)+1。

IO密集型任務(wù):線程在執(zhí)行IO密集型任務(wù)時,可能大部分時間都浪費在阻塞IO上了,所以對于IO密集型任務(wù)來說我們通常會設(shè)置線程數(shù)為CPU核心數(shù)*2。不過這樣子也不一定是最佳的,我們可以通過公式來進行計算:線程數(shù)=CPU核心數(shù)*(1+平均等待時間/平均工作時間),盡可能的還要根據(jù)壓縮來進行調(diào)整。

publicclassCustomThreadPoolDemo{

publicstaticvoidmain(String[]args){

//創(chuàng)建線程池,大小為3,最大線程數(shù)為6,空閑線程存活時間為5秒,使用自定義線程工廠和拒絕策略

ThreadPoolExecutorexecutor=newThreadPoolExecutor(3,6,5,TimeUnit.SECONDS,

newLinkedBlockingQueue(10),newCustomThreadFactory(),newCustomRejectedExecutionHandler());

//提交10個任務(wù)

for(inti=0;ii++){

executor.submit(newTask(i));

//關(guān)閉線程池

executor.shutdown();

staticclassTaskimplementsRunnable{

privateinttaskId;

publicTask(inttaskId){

this.taskId=taskId;

@Override

publicvoidrun(){

System.out.println(Task+taskId+isrunninginthread+Thread.currentThread().getName());

try{

Thread.sleep(3000);

}catch(InterruptedExceptione){

e.printStackTrace();

System.out.println(Task+taskId+isdone.

staticclassCustomThreadFactoryimplementsjava.util.concurrent.ThreadFactory{

privateintcount=1;

@Override

publicThreadnewThread(Runnabler){

Threadt=newThread(r);

t.setName(CustomThreadPool-+count++);

returnt;

staticclassCustomRejectedExecutionHandlerimplementsjava.util.concurrent.RejectedExecutionHandler{

@Override

publicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){

System.out.println(Task+((Task)r).taskId+isrejected.

該示例代碼使用ThreadPoolExecutor類創(chuàng)建了一個大小為3,最大線程數(shù)為6,空閑線程存活時間為5秒的線程池,任務(wù)隊列的大小為10,使用了自定義的線程工廠和拒絕策略。然后提交了10個任務(wù),每個任務(wù)輸出了當前線程的名稱,并休眠了3秒鐘。當程序執(zhí)行時,可能會出現(xiàn)任務(wù)被拒絕執(zhí)行的情況,拒絕策略會輸出任務(wù)被拒絕的信息。

線程池執(zhí)行任務(wù)的流程

ThreadPoolExecutor提供了兩種執(zhí)行任務(wù)的方法:

Futuresubmit(Runnabletask)

voidexecute(Runnablecommand)

實際上submit中也是調(diào)用了execute方法

publicFuturesubmit(Runnabletask){

if(task==null)thrownewNullPointerException();

RunnableFutureVoidftask=newTaskFor(task,null);

execute(ftask);

returnftask;

線程池執(zhí)行流程圖

基礎(chǔ)屬性和變量

privatefinalAtomicIntegerctl

線程池源碼中使用ctl通過高低位的方式來記錄線程池的狀態(tài)和當前線程池中的工作線程數(shù)量。

Integer占用4個字節(jié)也就是32位,線程池有5種狀態(tài),要標識5種狀態(tài)需要3位

前三位

privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;

privatestaticfinalintCAPACITY=(1COUNT_BITS)-1;

//runStateisstoredinthehigh-orderbits

privatestaticfinalintRUNNING=-1COUNT_BITS;

privatestaticfinalintSHUTDOWN=0COUNT_BITS;

privatestaticfinalintSTOP=1COUNT_BITS;

privatestaticfinalintTIDYING=2COUNT_BITS;

privatestaticfinalintTERMINATED=3COUNT_BITS;

Integer.SIZE為32,所以COUNT_BITS為29,最終各個狀態(tài)對應(yīng)的二級制為:

RUNNING:11100000000000000000000000000000

SHUTDOWN:00000000000000000000000000000000

STOP:00100000000000000000000000000000

TIDYING:01000000000000000000000000000000

TERMINATED:01100000000000000000000000000000

execute(Runnablecommand)

publicvoidexecute(Runnablecommand){

if(command==null)

thrownewNullPointerException();

//ctl初始值是ctlOf(RUNNING,0),表示線程池處于運行中,工作線程數(shù)為0

intc=ctl.get();

//判斷工作線程是否小于核心線程數(shù)

if(workerCountOf(c)corePoolSize){

//小于核心線程要新增工作線程

if(addWorker(command,true))

return;

//新增失敗重新獲取一次ctl

c=ctl.get();

//線程池是否處于Running狀態(tài)入隊是否成功

if(isRunning(c)workQueue.offer(command)){//入隊成功

//重新獲取ctl

intrecheck=ctl.get();

//如果線程池不是Running狀態(tài)就需要移除掉這個任務(wù)

if(!isRunning(recheck)remove(command))

//觸發(fā)拒絕策略

reject(command);

//工作線程為0時要去創(chuàng)建新的工作線程

elseif(workerCountOf(recheck)==0)

addWorker(null,false);

//如果線程池狀態(tài)不是RUNNING,或者線程池狀態(tài)是RUNNING但是隊列滿了,則去添加一個非核心工作線程。false表示非核心線程

elseif(!addWorker(command,false))

reject(command);

addWorker(RunnablefirstTask,booleancore)

//core:true核心線程false非核心線程

privatebooleanaddWorker(RunnablefirstTask,booleancore){

retry:

for(;;){

//獲取ctl值

intc=ctl.get();

//獲取高3位

intrs=runStateOf(c);

//線程池如果是SHUTDOWN狀態(tài)并且隊列非空則創(chuàng)建線程,如果隊列為空則不創(chuàng)建線程

//線程池如果是STOP狀態(tài)則直接不創(chuàng)建線程

if(rs=SHUTDOWN

!(rs==SHUTDOWN

firstTask==null

!workQueue.isEmpty()))

returnfalse;

for(;;){

//獲取工作線程數(shù)

intwc=workerCountOf(c);

//工作線程數(shù)超過規(guī)定數(shù)量則不創(chuàng)建線程

if(wc=CAPACITY||

wc=(corecorePoolSize:maximumPoolSize))

returnfalse;

//修改工作線程

if(compareAndIncrementWorkerCount(c))

//成功則退出retry這個循環(huán)

breakretry;

//CAS失敗說明有其他線程也在增加工作線程數(shù)量,此時重新獲取ctl值

c=ctl.get();//Re-readctl

//如果發(fā)現(xiàn)線程池的狀態(tài)發(fā)生了變化,則繼續(xù)回到retry,重新判斷線程池的狀態(tài)是不是SHUTDOWN或STOP

//如果狀態(tài)沒有變化,則繼續(xù)利用cas來增加工作線程數(shù),直到cas成功

if(runStateOf(c)!=rs)

continueretry;

//elseCASfailedduetoworkerCountchange;retryinnerloop

//到了這里說明ctl新增成功

booleanworkerStarted=false;

booleanworkerAdded=false;

Workerw=null;

try{

//Worker實現(xiàn)了Runnable接口在構(gòu)造一個Worker對象時,就會利用ThreadFactory新建一個線程

w=newWorker(firstTask);

//拿出線程對象此時線程還沒有start啟動

finalThreadt=w.thread;

if(t!=null){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

//獲取高三位

intrs=runStateOf(ctl.get());

//如果線程池的狀態(tài)是RUNNING

//或者線程池的狀態(tài)變成了SHUTDOWN,但是當前線程沒有自己的第一個任務(wù),那就表示當前調(diào)用addWorker方法是為了從隊列中獲取任務(wù)來執(zhí)行

//正常情況下線程池的狀態(tài)如果是SHUTDOWN,是不能創(chuàng)建新的工作線程的,但是隊列中如果有任務(wù),那就是上面說的特例情況

if(rsSHUTDOWN||

(rs==SHUTDOWNfirstTask==null)){

//如果Worker對象對應(yīng)的線程已經(jīng)在運行了,那就有問題,直接拋異常

if(t.isAlive())//precheckthattisstartable

thrownewIllegalThreadStateException();

//workers用來記錄當前線程池中工作線程,調(diào)用線程池的shutdown方法時會遍歷worker對象中斷對應(yīng)線程

workers.add(w);

ints=workers.size();

//largestPoolSize用來跟蹤線程池在運行過程中工作線程數(shù)的峰值

if(slargestPoolSize)

largestPoolSize=s;

workerAdded=true;

}finally{

mainLock.unlock();

//啟動線程

if(workerAdded){

t.start();

workerStarted=true;

}finally{

//在上述過程中如果拋了異常,需要從works中移除所添加的work,并且還要修改ctl,工作線程數(shù)-1,表示新建工作線程失敗

if(!workerStarted)

addWorkerFailed(w);

returnworkerStarted;

addWorker核心邏輯:

先判斷工作線程數(shù)是否超過了限制

修改ctl,使得工作線程數(shù)+1

構(gòu)造Work對象,并把它添加到workers集合中

啟動Work對象對應(yīng)的工作線程

runWorker(this)

剛剛有說到Worker實現(xiàn)了Runnable接口,看看他重寫的Run方法中執(zhí)行過什么

Worker(RunnablefirstTask){

setState(-1);//inhibitinterruptsuntilrunWorker

this.firstTask=firstTask;

this.thread=getThreadFactory().newThread(this);

/**DelegatesmainrunlooptoouterrunWorker*/

publicvoidrun(){

runWorker(this);

finalvoidrunWorker(Workerw){

//獲取當前工作線程

Threadwt=Thread.currentThread();

//獲取第一個任務(wù)

Runnabletask=w.firstTask;

//置空

w.firstTask=null;

w.unlock();//allowinterrupts

booleancompletedAbruptly=true;

try{

//判斷當前第一個任務(wù)是否為空,為空的話從阻塞隊列獲取一個任務(wù),阻塞隊列也為空就會阻塞在getTask()方法中

//也不會一直阻塞下去,keepAliveTime超時后還沒有獲取到任務(wù)就會返回null,退出循環(huán),這個線程也就是中止了

while(task!=null||(task=getTask())!=null){

w.lock();

//線程池狀態(tài)為STOP,則要中斷自己,但是如果發(fā)現(xiàn)中斷標記為true,那是不對的,因為線程池狀態(tài)不是STOP,工作線程仍然是要正常工作的,不能中斷掉,算是SHUTDOWN,也要等任務(wù)都執(zhí)行完之后,線程才結(jié)束,而目前線程還在執(zhí)行任務(wù)的過程中,不能中斷

if((runStateAtLeast(ctl.get(),STOP)||

(Terrupted()

runStateAtLeast(ctl.get(),STOP)))

!wt.isInterrupted())

errupt();

try{

//空方法給自定義線程池實現(xiàn)

beforeExecute(wt,task);

Throwablethrown=null;

try{

//執(zhí)行任務(wù)

task.run();

}catch(RuntimeExceptionx){

thrown=x;throwx;

}catch(Errorx){

thrown=x;throwx;

}catch(Throwablex){

thrown=x;thrownewError(x);

}finally{

//空方法給自定義線程池實現(xiàn)

afterExecute(task,thrown);

}finally{

task=null;

pletedTasks++;

w.unlock();

//正常退出了while循環(huán)

//completedAbruptly=false,表示線程正常退出

completedAbruptly=false;

}finally{

//如果線程正常退出這個線程會自然死亡

//但是如果是由于執(zhí)行任務(wù)的時候拋了異常,那么這個線程不應(yīng)該直接結(jié)束,而應(yīng)該繼續(xù)從隊列中獲取下一個任務(wù)

processWorkerExit(w,completedAbruptly);

processWorkerExit(Workerw,booleancompletedAbruptly)

privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){

//如果completedAbruptly為true,表示是執(zhí)行任務(wù)的時候拋了異常,那就修改ctl,工作線程數(shù)-1

if(completedAbruptly)//Ifabrupt,thenworkerCountwasntadjusted

decrementWorkerCount();

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

completedTaskCount+=pletedTasks;

//將當前Work對象從workers中移除

workers.remove(w);

}finally{

mainLock.unlock();

//因為當前是處理線程退出流程中,所以要嘗試去修改線程池的狀態(tài)為TINDYING

tryTerminate();

//獲取當前ctl值

intc=ctl.get();

//如果線程池的狀態(tài)為RUNNING或者SHUTDOWN,則可能要替補一個線程

if(runStateLessThan(c,STOP)){

//completedAbruptly為false,表示線程是正常要退出了,則看是否需要保留線程

if(!completedAbruptly){

//如果allowCoreThreadTimeOut為true,但是阻塞隊列中還有任務(wù),那就至少得保留一個工作線程來處理阻塞隊列中的任務(wù)

//如果allowCoreThreadTimeOut為false,那min就是corePoolSize,表示至少得保留corePoolSize個工作線程活著

intmin=allowCoreThreadTimeOut0:corePoolSize;

if(min==0!workQueue.isEmpty())

min=1;

//如果當前工作線程數(shù)大于等于min,則表示符合所需要保留的最小線程數(shù),那就直接return,不會調(diào)用下面的addWorker方法新開一個工作線程了

if(workerCountOf(c)=min)

return;//replacementnotneeded

//新開工作線程

addWorker(null,false);

某個工作線程正常情況下會不停的循環(huán)從阻塞隊列中獲取任務(wù)來執(zhí)行,正常情況下就是通過阻塞來保證線程永遠活著,但是會有一些特殊情況:

如果線程被中斷了,那就會退出循環(huán),然后做一些善后處理,比如ctl中的工作線程數(shù)-1,然后自己運行結(jié)束

如果線程阻塞超時了,那也會退出循環(huán),此時就需要判斷線程池中的當前工作線程夠不夠,比如是否有corePoolSize個工作線程,如果不夠就需要新開一個線程,然后當前線程自己運行結(jié)束,這種看上去效率比較低,但是也沒辦法,當然如果當前工作線程數(shù)足夠,那就正常,自己正常的運行結(jié)束即可

如果線程是在執(zhí)行任務(wù)的時候拋了移除,從而退出循環(huán),那就直接新開一個線程作為替補,當然前提是線程池的狀態(tài)是RUNNING

getTask()

privateRunnablegetTask(){

booleantimedOut=false;//Didthelastpoll()timeout

for(;;){

intc=ctl.get();

intrs=runStateOf(c);

//如果線程池狀態(tài)是STOP,表示當前線程不需要處理任務(wù)了,那就修改ctl工作線程數(shù)-1

//如果線程池狀態(tài)是SHUTDOWN,但是阻塞隊列中為空,表示當前任務(wù)沒有任務(wù)要處理了,那就修改ctl工作線程數(shù)-1

//returnnull表示當前線程無需處理任務(wù),線程退出

if(rs=SHUTDOWN(rs=STOP||workQueue.isEmpty())){

decrementWorkerCount();

returnnull;

//當前工作線程數(shù)

intwc=workerCountOf(c);

//用來判斷當前線程是無限阻塞還是超時阻塞,如果一個線程超時阻塞,那么一旦超時了,那么這個線程最終就會退出

//如果是無限阻塞,那除非被中斷了,不然這個線程就一直等著獲取隊列中的任務(wù)

//allowCoreThreadTimeOut為true,表示線程池中的所有線程都可以被回收掉,則當前線程應(yīng)該直接使用超時阻塞,一旦超時就回收

//allowCoreThreadTimeOut為false,則要看當前工作線程數(shù)是否超過了corePoolSize,如果超過了,則表示超過部分的線程要用超時阻塞,一旦超時就回收

booleantimed=allowCoreThreadTimeOut||wccorePoolSize;

//如果工作線程數(shù)超過了工作線程的最大限制或者線程超時了,則要修改ctl,工作線程數(shù)減1,并且returnnull

//returnnull就會導(dǎo)致外層的while循環(huán)退出,從而導(dǎo)致線程直接運行結(jié)束

//直播課程里會細講timedtimedOut

if((wcmaximumPoolSize||(timedtimedOut))

(wc1||workQueue.isEmpty())){

if(compareAndDecrementWorkerCount(c))

returnnull;

continue;

try{

//要么超時阻塞,要么無限阻塞

Runnabler=timed

workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):

workQueue.take();

//表示沒有超時,在阻塞期間獲取到了任務(wù)

if(r!=null)

returnr;

//超時了,重新進入循環(huán),上面的代碼會判斷出來當前線程阻塞超時了,最后returnnull,線程會運行結(jié)束

timedOut=true;

}catch(InterruptedExceptionretry){

//如果線程池的狀態(tài)變成了STOP或者SHUTDOWN,最終也會returnnull,線程會運行結(jié)束

//但是如果線程池的狀態(tài)仍然是RUNNING,那當前線程會繼續(xù)從隊列中去獲取任務(wù),表示忽略了本次中斷

//只有通過調(diào)用線程池的shutdown方法或shutdownNow方法才能真正中斷線程池中的線程

timedOut=false;

shutdown()

publicvoidshutdown(){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

checkShutdownAccess();

//修改ctl,將線程池狀態(tài)改為SHUTDOWN

advanceRunState(SHUTDO

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論