《Python程序設(shè)計與算法基礎(chǔ)教程》課件(含思政案例) Ch18 并行計算:線程、進(jìn)程和協(xié)程_第1頁
《Python程序設(shè)計與算法基礎(chǔ)教程》課件(含思政案例) Ch18 并行計算:線程、進(jìn)程和協(xié)程_第2頁
《Python程序設(shè)計與算法基礎(chǔ)教程》課件(含思政案例) Ch18 并行計算:線程、進(jìn)程和協(xié)程_第3頁
《Python程序設(shè)計與算法基礎(chǔ)教程》課件(含思政案例) Ch18 并行計算:線程、進(jìn)程和協(xié)程_第4頁
《Python程序設(shè)計與算法基礎(chǔ)教程》課件(含思政案例) Ch18 并行計算:線程、進(jìn)程和協(xié)程_第5頁
已閱讀5頁,還剩44頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

第18章并行計算:線程、進(jìn)程和協(xié)程本章要點(diǎn):并行處理概述

基于線程的并發(fā)處理

基于進(jìn)程的并行計算

基于線程池/進(jìn)程池的并發(fā)/并行任務(wù)

基于asyncio的異步IO編程資源下載提示2課件等資源:掃描封底的“課件下載”二維碼,在公眾號“書圈”中下載。素材(源碼):掃描本書目錄上方的二維碼下載。講解視頻:掃描封底刮刮卡中的二維碼,再掃描書中相應(yīng)章節(jié)中(位于每章最前)的二維碼,作為開源的補(bǔ)充閱讀和學(xué)習(xí)資源。

案例研究:掃描封底刮刮卡中的二維碼,再掃描書中相應(yīng)章節(jié)中(位于每章最后)的二維碼,可以在線學(xué)習(xí)。每章練習(xí)題:掃描封底刮刮卡中的二維碼,再掃描每章習(xí)題部分的二維碼,下載本章練習(xí)題電子版。

題庫平臺:教師登錄網(wǎng)站(),聯(lián)系客服開通教師權(quán)限并行處理概述(1)進(jìn)程是操作系統(tǒng)中正在執(zhí)行的不同應(yīng)用程序的一個實(shí)例線程是進(jìn)程中的一個實(shí)體,是被操作系統(tǒng)獨(dú)立調(diào)度和分派處理器時間的基本單位線程的優(yōu)缺點(diǎn)并發(fā)處理,因而特別適合需要同時執(zhí)行多個操作的場合解決用戶響應(yīng)性能和多任務(wù)的問題引入了資源共享和同步等問題并行處理概述(2)協(xié)程(Coroutine)又稱微線程、纖程,協(xié)程不是進(jìn)程或線程,其執(zhí)行過程更類似于函數(shù)調(diào)用Python的asyncio模塊實(shí)現(xiàn)的異步IO編程框架中,協(xié)程是對使用async關(guān)鍵字定義的異步函數(shù)的調(diào)用一個進(jìn)程包含多個線程,同樣,一個程序可以包含多個協(xié)程。多個線程相對獨(dú)立,線程有自己的上下文,切換受系統(tǒng)控制;同樣,多個協(xié)程也相對獨(dú)立,協(xié)程也有自己的上下文,但是其切換由程序自己控制。協(xié)程適合于異步IO編程的場合,能有效提高IO的吞吐效率。Python語言與并行處理相關(guān)模塊Python標(biāo)準(zhǔn)庫中包括下列與并行處理相關(guān)的模塊。_thread和_dummy_thread模塊:底層低級線程API。threading模塊:線程及其同步處理。multiprocessing模塊:多進(jìn)程處理和進(jìn)程池。concurrent.futures模塊:啟動并行任務(wù)。queue模塊:線程安全隊列,用于線程或進(jìn)程間信息傳遞。asyncio模塊:異步IO、事件循環(huán)、協(xié)程和任務(wù)處理?;诰€程的并發(fā)處理threading模塊概述Python標(biāo)準(zhǔn)庫模塊threading提供了與線程相關(guān)的操作:創(chuàng)建線程、啟動線程、線程同步通過創(chuàng)建threading.Thread對象實(shí)例,可以創(chuàng)建線程;調(diào)用Thread對象的start()方法,可啟動線程。也可以創(chuàng)建Thread的派生類,重寫run方法,然后創(chuàng)建其對象實(shí)例來創(chuàng)建線程。通過線程對象的daemon屬性,可設(shè)置線程為用戶線程或daemon線程當(dāng)多個線程調(diào)用單個對象的屬性和方法時,一個線程可能會中斷另一個線程正在執(zhí)行的任務(wù),使該對象處于一種無效狀態(tài),因此必須針對這些調(diào)用進(jìn)行同步處理Python語言提供了多種線程同步處理解決方案:Lock/RLock對象、Condition對象、Semaphore對象、Event對象、Barrier對象使用Thread對象創(chuàng)建線程通過創(chuàng)建Thread的對象可以創(chuàng)建線程:Thread(target=None,name=None,args=(),kwargs={})#構(gòu)造函數(shù)通過調(diào)用Thread對象的start方法可以啟動線程。Thread對象的常用方法如下。t.start():啟動線程。t.is_alive():判斷線程是否活動。:屬性:線程名。對應(yīng)于老版本的方法getname()和setname()。t.id:返回線程標(biāo)識符。threading模塊包含以下若干實(shí)用函數(shù)。threading.get_ident():返回當(dāng)前線程的標(biāo)識符。threading.current_thread():返回當(dāng)前線程。threading.active_count():返回活動的線程數(shù)目。threading.enumerate():返回活動線程的列表?!纠?8.1】直接使用Thread對象創(chuàng)建和啟動新線程importthreading,time,randomdeftimer(interval):foriinrange(3):time.sleep(random.choice(range(interval)))#隨機(jī)睡眠interval秒thread_id=threading.get_ident()#獲取當(dāng)前線程標(biāo)識符print('Thread:{0}Time:{1}'.format(thread_id,time.ctime()))if__name__=='__main__':t1=threading.Thread(target=timer,args=(5,))#創(chuàng)建線程t2=threading.Thread(target=timer,args=(5,))#創(chuàng)建線程t1.start();t2.start()#啟動線程自定義派生于Thread的對象通過聲明Thread的派生類,并重寫對象的run方法,然后創(chuàng)建其對象實(shí)例,可創(chuàng)建線程。通過對象的start方法,可啟動線程,并自動執(zhí)行對象的run方法【例18.2】通過聲明Thread派生類,以創(chuàng)建和啟動新線程(td_MyThread.py)importthreading,time,randomclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)erval=interval#對象屬性defrun(self):#定義run方法foriinrange(5):time.sleep(random.choice(range(erval)))#隨機(jī)睡眠interval秒thread_id=threading.get_ident()#獲取當(dāng)前線程標(biāo)識符print('Thread:{0}Time:{1}\n'.format(thread_id,time.ctime()))if__name__=='__main__':t1=MyThread(5)#創(chuàng)建對象t2=MyThread(5)#創(chuàng)建對象t1.start();t2.start()#啟動線程線程加入join()所謂線程加入(t.join()),即讓包含代碼的線程(tc,即當(dāng)前線程)“加入”到另外一個線程(t)的尾部。在線程(t)執(zhí)行完畢之前,線程(tc)不能執(zhí)行【例18.3】線程join示例(td_join.py)importthreading,time,randomclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)defrun(self):#定義run方法foriinrange(5):time.sleep(1)#睡眠1秒t=threading.current_thread()#獲取當(dāng)前線程print('{0}at{1}\n'.format(,time.ctime()))#打印線程名、當(dāng)前時間print('線程t1結(jié)束')deftest():t1=MyThread()#創(chuàng)建線程對象='t1'#設(shè)置線程名稱t1.start()#啟動線程print('主線程開始等待線程(t1)2s');t1.join(2)print('主線程等待線程(t1)2s結(jié)束')print('主線程開始等待線程結(jié)束');t1.join()print('主線程結(jié)束')if__name__=='__main__':test()用戶線程和daemon線程線程可以分為用戶線程和daemon線程。用戶線程(非daemon線程)是通常意義的線程,應(yīng)用程序運(yùn)行即為主線程,在主線程中可以創(chuàng)建和啟動新線程,默認(rèn)為用戶線程。只有當(dāng)所有的非daemon的用戶線程(包括主線程)結(jié)束后,應(yīng)用程序終止daemon線程,又稱守護(hù)線程,其優(yōu)先級是最低的,一般為其它的線程提供服務(wù)。通常,daemon線程體是一個無限循環(huán)。如果所有的非daemon線程都結(jié)束了,則daemon線程自動就會終止importthreading,timeclassMyThread(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)erval=interval#對象屬性defrun(self):#定義run方法t=threading.current_thread()#獲取當(dāng)前線程print('線程'++'開始')time.sleep(erval)#延遲erval秒print('線程'++'結(jié)束')classMyThreadDaemon(threading.Thread):#繼承threading.Threaddef__init__(self,interval):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)erval=interval#對象屬性defrun(self):#定義run方法t=threading.current_thread()#獲取當(dāng)前線程print('線程'++'開始')whileTrue:time.sleep(erval)#延遲erval秒print('daemon線程'++'正在運(yùn)行')print('線程'++'結(jié)束')deftest():print('主線程開始')t1=MyThread(5)#創(chuàng)建線程對象t2=MyThreadDaemon(1)#創(chuàng)建線程對象='t1';='t2'#設(shè)置線程名稱t2.daemon=True#設(shè)置為daemont1.start()#啟動線程t2.start()print('主線程結(jié)束')if__name__=='__main__':test()【例18.4】用戶線程和Daemon線程示例Timer線程【例18.5】Timer線程示例(td_timer.py)importthreadingdeff():print('HelloTimer!')#創(chuàng)建定時器,1秒后運(yùn)行g(shù)lobaltimertimer=threading.Timer(1,f)timer.start()timer=threading.Timer(1,f)#創(chuàng)建定時器,1秒后運(yùn)行timer.start()#啟動定時器使用Python標(biāo)準(zhǔn)庫threading中的Timer線程(Thread的子類),可以很方便實(shí)現(xiàn)定時器功能Timer對象包含的主要方法如下:(1)Timer(interval,function,args=None,kwargs=None):構(gòu)造函數(shù)。在指定時間interval后執(zhí)行函數(shù)(2)start():啟動線程,即啟動計時器(3)cancel():取消計時器線程同步(1)基于原語鎖(Lock/RLock對象)的簡單同步【例18.6】使用lock語句同步代碼塊示例(lock.py)。創(chuàng)建工作線程,模擬銀行現(xiàn)金帳戶取款。多個線程同時執(zhí)行取款操作時,如果不使用同步處理,會造成賬戶余額混亂;嘗試使用同步鎖對象Lock,以保證多個線程同時執(zhí)行取款操作時,銀行現(xiàn)金帳戶取款的有效和一致【例18.6】使用lock語句同步代碼塊importthreading,time,randomclassAccount(threading.Thread):#繼承threading.Threadlock=threading.Lock()#創(chuàng)建鎖def__init__(self,amount):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)Account.amount=amount#賬戶金額defrun(self):#定義run方法self.withdraw()#取款defwithdraw(self):Account.lock.acquire()#獲取鎖。注釋不使用同步處理t=threading.current_thread()a=random.choice(range(50,101))ifAccount.amount<a:print('{0}交易失敗。取款前余額:{1},取款額:{2}'.format(,Account.amount,a))Account.lock.release()return0#拒絕交易time.sleep(random.choice(range(5)))#隨機(jī)睡眠[0-5)秒prev=Account.amountAccount.amount-=a#取款print('{0}取款前余額:{1},取款額:{2},取款后額:{3}'.format(,prev,a,Account.amount))Account.lock.release()#釋放鎖。注釋不使用同步處理deftest():foriinrange(5):#創(chuàng)建5個線程對象并啟動Account(200).start()if__name__=='__main__':test()線程同步(2)基于條件變量(Condition對象)的同步和通信【例18.7】線程間通信示例(producer_consumer.py)。生產(chǎn)者/消費(fèi)者模型,使用線程間通信,生產(chǎn)者生產(chǎn)一件、消費(fèi)者消費(fèi)一件,二者保持同步。未使用線程同步(把最后1行代碼改為test2()),則結(jié)果無法預(yù)料【例18.7】線程間通信示例(producer_consumer.py)(1)importthreading,time,randomclassContainer1():#基于同步和通信def__init__(self):#構(gòu)造函數(shù)self.contents=0#容器內(nèi)容self.available=False#容器內(nèi)容self.cv=threading.Condition()#條件變量defput(self,value):#生產(chǎn)函數(shù)withself.cv:#使用條件變量同步ifself.available:#如果已經(jīng)生產(chǎn),則等待self.cv.wait()#等待self.contents=value#生產(chǎn),設(shè)置內(nèi)容t=threading.current_thread()print('{0}生產(chǎn){1}'.format(,self.contents))self.available=True#設(shè)置容器狀態(tài):已生產(chǎn)self.cv.notify()#通知等待的消費(fèi)者defget(self):#消費(fèi)函數(shù)withself.cv:#使用條件變量同步ifnotself.available:#如果已經(jīng)生產(chǎn),則等待self.cv.wait()#等待t=threading.current_thread()【例18.7】線程間通信示例(producer_consumer.py)(2)print('{0}消費(fèi){1}'.format(,self.contents))self.available=False#設(shè)置容器狀態(tài):未生產(chǎn)self.cv.notify()#通知等待的生產(chǎn)者classContainer2():#無同步和通信def__init__(self):#構(gòu)造函數(shù)self.contents=0#容器內(nèi)容self.available=False#容器內(nèi)容defput(self,value):#生產(chǎn)函數(shù)ifself.available:#如果已經(jīng)生產(chǎn)passelse:self.contents=value#生產(chǎn),設(shè)置內(nèi)容t=threading.current_thread()print('{0}生產(chǎn){1}'.format(,self.contents))self.available=True#設(shè)置容器狀態(tài):已生產(chǎn)defget(self):#消費(fèi)函數(shù)ifnotself.available:#如果已經(jīng)生產(chǎn),則等待passelse:【例18.7】線程間通信示例(producer_consumer.py)(3)else:self.contents=value#生產(chǎn),設(shè)置內(nèi)容t=threading.current_thread()print('{0}生產(chǎn){1}'.format(,self.contents))self.available=True#設(shè)置容器狀態(tài):已生產(chǎn)defget(self):#消費(fèi)函數(shù)ifnotself.available:#如果已經(jīng)生產(chǎn),則等待passelse:t=threading.current_thread()print('{0}消費(fèi){1}'.format(,self.contents))self.available=False#設(shè)置容器狀態(tài):未生產(chǎn)classProducer(threading.Thread):#生產(chǎn)者類def__init__(self,container):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)self.container=container#容器defrun(self):#定義run方法foriinrange(1,6):time.sleep(random.choice(range(5)))#隨機(jī)睡眠[0-5)秒self.container.put(i)#生產(chǎn)【例18.7】線程間通信示例(producer_consumer.py)(4)classConsumer(threading.Thread):#消費(fèi)者類def__init__(self,container):#構(gòu)造函數(shù)threading.Thread.__init__(self)#調(diào)用父類構(gòu)造函數(shù)self.container=container#容器defrun(self):#定義run方法foriinrange(1,6):time.sleep(random.choice(range(5)))#隨機(jī)睡眠[0-5)秒self.container.get()#消費(fèi)deftest1():print('基本同步和通信的生產(chǎn)者消費(fèi)者模型:')container=Container1()#創(chuàng)建容器Producer(container).start()#創(chuàng)建消費(fèi)者線程并啟動Consumer(container).start()#創(chuàng)建消費(fèi)者線程并啟動deftest2():print('無同步和通信的生產(chǎn)者消費(fèi)者模型:')container=Container2()#創(chuàng)建容器Producer(container).start()#創(chuàng)建消費(fèi)者線程并啟動Consumer(container).start()#創(chuàng)建消費(fèi)者線程并啟動if__name__=='__main__':test1()基于queue模塊中隊列的同步使用Python標(biāo)準(zhǔn)模塊queue提供了適用于多線程編程的先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)(即隊列),用來在生產(chǎn)者和消費(fèi)者線程之間的信息傳遞。使用queue模塊中的線程安全的隊列,可以快捷實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型Queue模塊中包含三種線程安全的隊列:Queue、LifoQueue和PriorityQueue。以Queue為例,其主要方法包括:(1)Queue(maxsize=0):構(gòu)造函數(shù),構(gòu)造指定大小的隊列。默認(rèn)不限定大小(2)put(item,block=True,timeout=None):向隊列中添加一個項。默認(rèn)阻塞,即隊列滿的時候,程序阻塞等待(3)get(block=True,timeout=None):從隊列中拿出一個項。默認(rèn)阻塞,即隊列為空的時候,程序阻塞等待【例18.8】基于queue.Queue的生產(chǎn)者和消費(fèi)者模型importtimeimportqueueimportthreadingq=queue.Queue(10)#創(chuàng)建一個大小為10的隊列defproductor(i):whileTrue:time.sleep(1)#休眠1秒鐘,即每秒鐘做一個包子q.put("廚師{}做的包子!".format(i))#如果隊列滿,則等待defconsumer(j):whileTrue:print("顧客{}吃了一個{}".format(j,q.get()))#如果隊列空,則等待time.sleep(1)#休眠1秒鐘,即每秒鐘吃一個包子foriinrange(3):#3個廚師不停做包子,t=threading.Thread(target=productor,args=(i,))t.start()forkinrange(10):#10個顧客等待吃包子v=threading.Thread(target=consumer,args=(k,))v.start()基于Event的同步和通信threading.Event是線程之間的通信機(jī)制之一:Event對象管理一個標(biāo)志(flag),默認(rèn)為FalseEvent相當(dāng)于紅綠燈信號,可用于主線程控制其他線程的執(zhí)行。當(dāng)flag為False時,其他的線程調(diào)用e.wait()阻塞等待這個信號;當(dāng)設(shè)置flag為True時,等待的線程解除阻塞繼續(xù)執(zhí)行Event對象主要包括下列方法:(1)wait([timeout]):阻塞等待,直到Event對象的flag為True或超時(2)set():將flag設(shè)置為True(3)clear():將flag設(shè)置為False(4)isSet():判斷flag是否為True【例18.9】基于Event的線程通信importthreadingimportrandomdeff(i,e):e.wait()#檢測Event的標(biāo)志,如果是False則阻塞print("線程{}的隨機(jī)結(jié)果為{}".format(i,random.randrange(1,100)))if__name__=='__main__':event=threading.Event()#創(chuàng)建事件對象,默認(rèn)標(biāo)志為Falseforiinrange(3):#創(chuàng)建3個線程并運(yùn)行,默認(rèn)阻塞等待Eventt=threading.Thread(target=f,args=(i,event))t.start()ready=input('請輸入1開始繼續(xù)執(zhí)行阻塞的線程:')ifready=="1":event.set()#設(shè)置Event的flag為True基于進(jìn)程的并行計算multiprocessing模塊概述Python標(biāo)準(zhǔn)庫模塊multiprocessing提供了與進(jìn)程相關(guān)的操作:創(chuàng)建進(jìn)程、啟動進(jìn)程、進(jìn)程同步等模塊multiprocessing還提供進(jìn)程池和線程池創(chuàng)建和使用進(jìn)程multiprocessing模塊包含以下若干實(shí)用函數(shù)。cpu_count():可用的CPU核數(shù)量。current_process():返回當(dāng)前進(jìn)程。active_children():活動的子進(jìn)程。log_to_stderr():函數(shù)可設(shè)置輸出日志信息到標(biāo)準(zhǔn)錯誤輸出(默認(rèn)為控制臺)【例18.10】使用Process對象創(chuàng)建和啟動新進(jìn)程importtime,randomimportmultiprocessingasmpdeftimer(interval):foriinrange(3):time.sleep(random.choice(range(interval)))#隨機(jī)睡眠interval秒pid=mp.current_process().pid#獲取當(dāng)前進(jìn)程IDprint('Process:{0}Time:{1}'.format(pid,time.ctime()))if__name__=='__main__':p1=mp.Process(target=timer,args=(5,))#創(chuàng)建進(jìn)程p2=mp.Process(target=timer,args=(5,))#創(chuàng)建進(jìn)程p1.start();p2.start()#啟動線程p1.join();p2.join()進(jìn)程的數(shù)據(jù)共享模塊multiprocessing為進(jìn)程間通信提供了兩種方法:Queue和Pipe模塊multiprocessing中的Queue類似于queue.Queue(參見18.2.9),為進(jìn)程間通信提供了一個線程和進(jìn)程安全的隊列模塊multiprocessing中的Pipe()返回一個管道(包括兩個連接對象),兩個進(jìn)程可以分別連接到不同的端的連接對象,然后通過其send()方法發(fā)送數(shù)據(jù)或者通過recv()方法接收數(shù)據(jù)【例18.11】基于進(jìn)程和模塊multiprocessing中的Queue隊列的生產(chǎn)者和消費(fèi)者模型(mp_queue.py)importtimeimportmultiprocessingasmpdefproductor(i,q):whileTrue:time.sleep(1)#休眠1秒鐘,即每秒鐘做一個包子q.put("廚師{}做的包子!".format(i))#如果隊列滿,則等待defconsumer(j,q):whileTrue:print("顧客{}吃了一個{}".format(j,q.get()))#如果隊列空,則等待time.sleep(1)#休眠1秒鐘,即每秒鐘吃一個包子if__name__=='__main__':q=mp.Queue(10)#創(chuàng)建一個大小為10的隊列foriinrange(3):#3個廚師不停做包子,p=mp.Process(target=productor,args=(i,q))p.start()forkinrange(10):#10個顧客等待吃包子p=mp.Process(target=consumer,args=(k,q))p.start()【例18.12】基于模塊multiprocessing中的Pipe的進(jìn)程間通信importmultiprocessingasmpimporttime,random,itertoolsdefconsumer(conn):#從管道讀取數(shù)據(jù)whileTrue:try:item=conn.recv()time.sleep(random.randrange(2))#隨機(jī)休眠,代表處理過程print("consume:{}".format(item))exceptEOFError:breakdefproducer(conn):#生產(chǎn)項目并將其發(fā)送到連接的管道上foriinitertools.count(1):#從1開始無限循環(huán)time.sleep(random.randrange(2))#隨機(jī)休眠,代表處理過程conn.send(i)print("produce:{}".format(i))if__name__=="__main__":#創(chuàng)建管道,返回兩個連接對象的元組conn_out,conn_in=mp.Pipe()#創(chuàng)建并啟動生產(chǎn)者進(jìn)程,傳入?yún)?shù)管道一端的連接對象p_producer=mp.Process(target=producer,args=(conn_out,))p_producer.start()#創(chuàng)建并啟動消費(fèi)者進(jìn)程,傳入?yún)?shù)管道另一端的連接對象p_consumer=mp.Process(target=consumer,args=(conn_in,))p_consumer.start()#加入進(jìn)程,等待完成p_producer.join();p_consumer.join()進(jìn)程池(Pool)使用Python的標(biāo)準(zhǔn)庫模塊multiprocessing中的Pool類可創(chuàng)建進(jìn)程池。其大致步驟如下:(1)使用構(gòu)造函數(shù)Pool(processes,initializer,initargs)創(chuàng)建一個進(jìn)程池對象。這三個均為可選參數(shù)。其中processes為進(jìn)程池的進(jìn)程數(shù)量,默認(rèn)為CPU的核數(shù)量;initializer和initargs為啟動任務(wù)進(jìn)程時執(zhí)行的初始化函數(shù)及其參數(shù)(2)調(diào)用進(jìn)程池對象的方法執(zhí)行任務(wù),返回結(jié)果收集為一個列表。包括apply_async、apply、map_async、map等。其中apply_async和map_async是異步非阻塞模式,即啟動進(jìn)程函數(shù)之后會繼續(xù)執(zhí)行后續(xù)的代碼不用等待進(jìn)程函數(shù)返回。進(jìn)程池的map()方法與內(nèi)置的map()函數(shù)一樣,把函數(shù)應(yīng)用于可迭代對象的每一個元素(3)等待任務(wù)進(jìn)程完成。調(diào)用join()方法加入進(jìn)程池,等待其完成。也可以調(diào)用close()關(guān)閉進(jìn)程池,不再加入新的任務(wù)(注:Pool對象支持with上下文操作,自動調(diào)用close()方法);或者調(diào)用terminate()直接終止進(jìn)程池【例18.13】進(jìn)程池的使用(mp_pool.py)frommultiprocessingimportPool,TimeoutErrorimporttimeimportosdeff(x):returnx*x#返回x的平方if__name__=='__main__':#創(chuàng)建四個進(jìn)程的進(jìn)程池,并調(diào)用其對象方法并行執(zhí)行各任務(wù)withPool(processes=4)aspool:#使用進(jìn)程池對象map函數(shù),并行計算并返回結(jié)果res1=pool.map(f,range(10))print("pool.map的結(jié)果:{}".format(res1))#使用進(jìn)程池對象的apply_async函數(shù),異步執(zhí)行一次任務(wù)res2=pool.apply_async(f,(20,))#異步求解f(20),僅使用一個進(jìn)程print(res2.get(timeout=1))#輸出結(jié)果:400res3=pool.apply_async(os.getpid,())#異步執(zhí)行os.getpid(),僅使用一個進(jìn)程print(res3.get(timeout=1))#輸出執(zhí)行任務(wù)的進(jìn)程的PIDres4=pool.apply_async(time.sleep,(10,))#異步睡眠10秒鐘try:print(res4.get(timeout=1))#嘗試獲得結(jié)果,等待超時為1秒鐘exceptTimeoutError:print("結(jié)果超時!")#使用列表解析式,可能使用多個進(jìn)程res5=[pool.apply_async(os.getpid,())foriinrange(5)]print([res.get(timeout=1)forresinres5])print("在With語句中,進(jìn)程池可用")print("在With語句之外,進(jìn)程池自動關(guān)閉,不再可用")基于線程池/進(jìn)程池的并發(fā)/并行任務(wù)模塊concurrent.futures概述標(biāo)準(zhǔn)庫提供了concurrent.futures模塊實(shí)現(xiàn)了對threading和multiprocessing的進(jìn)一步抽象,提供了編寫線程池(進(jìn)程池)的支持。包c(diǎn)oncurrent意指并發(fā),而futures意指將在未來完成的操作concurrent.futures模塊包含兩個主要類和實(shí)用函數(shù):(1)Executor:表示任務(wù)執(zhí)行器。它是抽象類,可以使用其子類ThreadPoolExecutor(線程池任務(wù)執(zhí)行器)或者ProcessPoolExecutor(進(jìn)程池任務(wù)執(zhí)行器)創(chuàng)建任務(wù)執(zhí)行器對象(2)Future:表示將執(zhí)行的任務(wù)ThreadPoolExecutor(線程池任務(wù)執(zhí)行器)是Executor的派生類,用于使用一個線程池異步執(zhí)行任務(wù)【例18.14】使用ThreadPoolExecutor并發(fā)爬取網(wǎng)頁(future_tpe_get_pages.py)importconcurrent.futuresascfimporttime,urllib.requestdefload_page(url):withurllib.request.urlopen(url,timeout=60)asconn:return('{}主頁大?。簕}字節(jié)'.format(url,len(conn.read())))if__name__=='__main__':URLS=['','/','/']#傳統(tǒng)串行方法start_time=time.time()forurlinURLS:print(load_page(url))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#使用ThreadPoolExecutor并發(fā)處理start_time=time.time()executor=cf.ThreadPoolExecutor()wait_for=[executor.submit(load_page,url)forurlinURLS]forfincf.as_completed(wait_for):#迭代完成的任務(wù),輸出其結(jié)果print(f.result())end_time=time.time()print("并發(fā)處理消耗時間:{}".format(end_time-start_time))使用ProcessPoolExecutor并發(fā)執(zhí)行任務(wù)ProcessPoolExecutor(進(jìn)程池任務(wù)執(zhí)行器)是Executor的派生類,用于使用一個線程池異步執(zhí)行任務(wù)。ProcessPoolExecutor基于multiprocessing模塊,因而避免了CPython的GIL限制,從而適用于計算密集的任務(wù)【例18.15】使用ProcessPoolExecutor求解最大公約數(shù)(future_ppe_gcd.py)…tobecontinuedimporttimeimportconcurrent.futuresascfdefgcd(pair):#求最大公約數(shù)a,b=pairlow=min(a,b)foriinrange(low,0,-1):ifa%i==0andb%i==0:returniif__name__=='__main__':#測試數(shù)據(jù)TEST_DATA=[(11880774,83664910),(13961044,17644234),(10112000,13380625)]#傳統(tǒng)串行方法start_time=time.time()res1=list(map(gcd,TEST_DATA))end_time=time.time()print("串行處理結(jié)果:{},消耗時間:{}".format(res1,end_time-start_time))#使用ProcessPoolExecutor并行處理start_time=time.time()pool=cf.ProcessPoolExecutor(max_workers=4)res2=list(pool.map(gcd,TEST_DATA))end_time=time.time()print("并行處理結(jié)果:{},消耗時間:{}".format(res2,end_time-start_time))【例18.15】使用ProcessPoolExecutor求解最大公約數(shù)(future_ppe_gcd.py)基于asyncio的異步IO編程異步IO(AsynchronousIO)是指程序發(fā)起一個IO操作(阻塞等待)后,不用等IO操作結(jié)束,可以繼續(xù)其它操作;做其他事情,當(dāng)IO操作結(jié)束時,會得到通知,然后繼續(xù)執(zhí)行。異步IO編程是實(shí)現(xiàn)并發(fā)的一種方式,適用于IO密集型任務(wù)Python標(biāo)準(zhǔn)庫模塊asyncio提供了一個異步編程框架,主要包括下列部分:(1)事件循環(huán)(eventloop)(2)協(xié)程(coroutine)(3)任務(wù)(Task)和Future(將執(zhí)行的任務(wù))創(chuàng)建協(xié)程(coroutine)對象通過async關(guān)鍵字定義一個異步函數(shù),調(diào)用異步函數(shù)返回一個協(xié)程(coroutine)對象。協(xié)程也是一種對象,協(xié)程不能直接運(yùn)行,需要把協(xié)程加入到事件循環(huán)中,由后者在適當(dāng)?shù)臅r候調(diào)用協(xié)程使用asyncio.get_event_loop()方法可以創(chuàng)建一個事件循環(huán)對象,然后使用其run_until_complete()方法將協(xié)程注冊到事件循環(huán)在異步函數(shù)中,可以使用await關(guān)鍵字,針對耗時的操作(例如網(wǎng)絡(luò)請求、文件讀取等IO操作)進(jìn)行掛起【例18.16】創(chuàng)建協(xié)程(coroutine)對象示例importasyncio,timeasyncdefdo_some_work(n):#使用async關(guān)鍵字定義異步函數(shù)print('等待:{}秒'.format(n))awaitasyncio.sleep(n)#休眠一段時間return'{}秒后返回結(jié)束運(yùn)行'.format(n)start_time=time.time()#開始時間coro=do_some_work(2)loop=asyncio.get_event_loop()loop.run_until_complete(coro)print('運(yùn)行時間:',time.time()-start_time)創(chuàng)建任務(wù)(Task)對象任務(wù)(Task)對象用于封裝協(xié)程對象,保存了協(xié)程運(yùn)行后的狀態(tài),用于未來獲取協(xié)程的結(jié)果可以使用asyncio.ensure_future(coroutine)創(chuàng)建一個任務(wù)對象,也可以使用事件循環(huán)對象的create_task(coroutine)方法創(chuàng)建任務(wù)使用run_until_complete()方法將任務(wù)注冊到事件循環(huán)。同時注冊多個任務(wù)的列表可以使用run_until_complete(asyncio.wait(tasks)),注冊多個任務(wù)可以使用run_until_complete(asyncio.gather(*tasks))【例18.17】創(chuàng)建任務(wù)對象示例importasyncio,timeasyncdefdo_some_work(i,n):#使用async關(guān)鍵字定義異步函數(shù)print('任務(wù){(diào)}等待:{}秒'.format(i,n))awaitasyncio.sleep(n)#休眠一段時間return'任務(wù){(diào)}在{}秒后返回結(jié)束運(yùn)行'.format(i,n)start_time=time.time()#開始時間tasks=[asyncio.ensure_future(do_some_work(1,2)),asyncio.ensure_future(do_some_work(2,1)),asyncio.ensure_future(do_some_work(3,3))]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))fortaskintasks:print('任務(wù)執(zhí)行結(jié)果:',task.result())print('運(yùn)行時間:',time.time()-start_time)應(yīng)用舉例使用Pool并行計算查找素數(shù)比較常規(guī)的串行處理(結(jié)果寫入prime1.txt)和基于進(jìn)程池Pool的并行處理(結(jié)果寫入prime2.txt)的時間消耗【例18.18】并行查找小于n的所有素數(shù)(1)importmathimporttimeimportmultiprocessingdefisprime(n):"""判斷n是否為素數(shù),如果是,返回n,否則返回0"""ifn<2:return0ifn==2:returnnk=int(math.ceil(math.sqrt(n)))i=2whilei<=k:ifn%i==0:return0i+=1returnnif__name__=="__main__":#測試數(shù)據(jù)test_data=range(10**6)#串行處理測試start_time=time.time()#結(jié)束時間withopen("prime1.txt","w")asoutf:fornumintest_data:r=isprime(num)ifr>0:outf.writelines("{}\n".format(num))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#并行處理測試start_time=time.time()#開始時間pool=multiprocessing.Pool(4)resultList=pool.map(isprime,test_data)pool.close()pool.join()withopen("prime2.txt","w")asoutf:forrinresultList:ifr>0:outf.writelines("{}\n".format(r))end_time=time.time()#結(jié)束時間print("并行處理消耗時間:{}".format(end_time-start_time))【例18.18】并行查找小于n的所有素數(shù)(2)使用ProcessPoolExecutor并行判斷素數(shù)比較常規(guī)的串行處理和基于ProcessPoolExecutor的并行處理的時間消耗【例18.19】使用ProcessPoolExecutor并行判斷素數(shù)(future_ppe_prime.py)(1)【例18.19】使用ProcessPoolExecutor并行判斷素數(shù)(future_ppe_prime.py)(2)importconcurrent.futuresascfimportmath,timedefis_prime(n):ifn<2:returnFalseifn==2:returnTrueifn%2==0:returnFalsesqrt_n=int(math.floor(math.sqrt(n)))foriinrange(3,sqrt_n+1,2):ifn%i==0:returnFalsereturnTrueif__name__=="__main__":#測試數(shù)據(jù)test_data=[112272535095293,112272535095293,115280095190773,1099726899285419]#串行處理測試start_time=time.time()#結(jié)束時間fornumintest_data:print('{}是素數(shù)否:{}'.format(num,is_prime(num)))end_time=time.time()print("串行處理消耗時間:{}".format(end_time-start_time))#并行處理測試start_time=time.time()#開始時間withcf.ProcessPoolExecutor()asexecutor:primes=executor.map(is_prime,test_data)fornumber,primeinzip(test_data,primes):print('{}是素數(shù)否:{}'.format(number,prime))end_time=time.time()print("并行處理消耗時間:{}".format(end_time-start_time))【例18.20】使用ThreadPoolExecutor批量下載網(wǎng)頁內(nèi)容(future_tpe_download.py)(1)importconcurrent.futuresimporturllib.requestimporttimedefload_url(url,ti

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論