chinese直男口爆体育生外卖, 99久久er热在这里只有精品99, 又色又爽又黄18禁美女裸身无遮挡, gogogo高清免费观看日本电视,私密按摩师高清版在线,人妻视频毛茸茸,91论坛 兴趣闲谈,欧美 亚洲 精品 8区,国产精品久久久久精品免费

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

關(guān)于Python多進(jìn)程和多線程詳解

新機(jī)器視覺(jué) ? 來(lái)源:pythonhosted ? 2023-11-06 14:46 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

在學(xué)習(xí)Python的過(guò)程中,有接觸到多線程編程相關(guān)的知識(shí)點(diǎn),先前一直都沒(méi)有徹底的搞明白。今天準(zhǔn)備花一些時(shí)間,把里面的細(xì)節(jié)盡可能的梳理清楚。

線程與進(jìn)程的區(qū)別

進(jìn)程(process)和線程(thread)是操作系統(tǒng)的基本概念,但是它們比較抽象,不容易掌握。關(guān)于多進(jìn)程和多線程,教科書(shū)上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”。線程是程序中一個(gè)單一的順序控制流程。進(jìn)程內(nèi)一個(gè)相對(duì)獨(dú)立的、可調(diào)度的執(zhí)行單元,是系統(tǒng)獨(dú)立調(diào)度和分派CPU的基本單位指運(yùn)行中的程序的調(diào)度單位。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程。

7004dea8-7b16-11ee-939d-92fbcf53809c.png

進(jìn)程和線程區(qū)別

進(jìn)程是資源分配的基本單位。所有與該進(jìn)程有關(guān)的資源,都被記錄在進(jìn)程控制塊PCB中。以表示該進(jìn)程擁有這些資源或正在使用它們。另外,進(jìn)程也是搶占處理機(jī)的調(diào)度單位,它擁有一個(gè)完整的虛擬地址空間。當(dāng)進(jìn)程發(fā)生調(diào)度時(shí),不同的進(jìn)程擁有不同的虛擬地址空間,而同一進(jìn)程內(nèi)的不同線程共享同一地址空間。 與進(jìn)程相對(duì)應(yīng),線程與資源分配無(wú)關(guān),它屬于某一個(gè)進(jìn)程,并與進(jìn)程內(nèi)的其他線程一起共享進(jìn)程的資源。線程只由相關(guān)堆棧(系統(tǒng)?;蛴脩魲#?a href="http://www.brongaenegriffin.com/tags/寄存器/" target="_blank">寄存器和線程控制表TCB組成。寄存器可被用來(lái)存儲(chǔ)線程內(nèi)的局部變量,但不能存儲(chǔ)其他線程的相關(guān)變量。 通常在一個(gè)進(jìn)程中可以包含若干個(gè)線程,它們可以利用進(jìn)程所擁有的資源。在引入線程的操作系統(tǒng)中,通常都是把進(jìn)程作為分配資源的基本單位,而把線程作為獨(dú)立運(yùn)行和獨(dú)立調(diào)度的基本單位。 由于線程比進(jìn)程更小,基本上不擁有系統(tǒng)資源,故對(duì)它的調(diào)度所付出的開(kāi)銷就會(huì)小得多,能更高效的提高系統(tǒng)內(nèi)多個(gè)程序間并發(fā)執(zhí)行的程度,從而顯著提高系統(tǒng)資源的利用率和吞吐量。 因而近年來(lái)推出的通用操作系統(tǒng)都引入了線程,以便進(jìn)一步提高系統(tǒng)的并發(fā)性,并把它視為現(xiàn)代操作系統(tǒng)的一個(gè)重要指標(biāo)。

70258676-7b16-11ee-939d-92fbcf53809c.png

線程與進(jìn)程的區(qū)別可以歸納為以下4點(diǎn):

地址空間和其它資源(如打開(kāi)文件):進(jìn)程間相互獨(dú)立,同一進(jìn)程的各線程間共享。某進(jìn)程內(nèi)的線程在其它進(jìn)程不可見(jiàn)。

通信:進(jìn)程間通信IPC,線程間可以直接讀寫(xiě)進(jìn)程數(shù)據(jù)段(如全局變量)來(lái)進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。

調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多。

在多線程O(píng)S中,進(jìn)程不是一個(gè)可執(zhí)行的實(shí)體。

多進(jìn)程和多線程的比較

對(duì)比維度 多進(jìn)程 多線程 總結(jié)
數(shù)據(jù)共享、同步 數(shù)據(jù)共享復(fù)雜,同步簡(jiǎn)單 數(shù)據(jù)共享簡(jiǎn)單,同步復(fù)雜 各有優(yōu)劣
內(nèi)存、CPU 占用內(nèi)存多,切換復(fù)雜,CPU利用率低 占用內(nèi)存少,切換簡(jiǎn)單,CPU利用率高 線程占優(yōu)
創(chuàng)建、銷毀、切換 復(fù)雜,速度慢 簡(jiǎn)單,速度快 線程占優(yōu)
編程、調(diào)試 編程簡(jiǎn)單,調(diào)試簡(jiǎn)單 編程復(fù)雜,調(diào)試復(fù)雜 進(jìn)程占優(yōu)
可靠性 進(jìn)程間不會(huì)互相影響 一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉 進(jìn)程占優(yōu)
分布式 適用于多核、多機(jī),擴(kuò)展到多臺(tái)機(jī)器簡(jiǎn)單 適合于多核 進(jìn)程占優(yōu)

總結(jié),進(jìn)程和線程還可以類比為火車(chē)和車(chē)廂:

線程在進(jìn)程下行進(jìn)(單純的車(chē)廂無(wú)法運(yùn)行)

一個(gè)進(jìn)程可以包含多個(gè)線程(一輛火車(chē)可以有多個(gè)車(chē)廂)

不同進(jìn)程間數(shù)據(jù)很難共享(一輛火車(chē)上的乘客很難換到另外一輛火車(chē),比如站點(diǎn)換乘)

同一進(jìn)程下不同線程間數(shù)據(jù)很易共享(A車(chē)廂換到B車(chē)廂很容易)

進(jìn)程要比線程消耗更多的計(jì)算機(jī)資源(采用多列火車(chē)相比多個(gè)車(chē)廂更耗資源)

進(jìn)程間不會(huì)相互影響,一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉(一列火車(chē)不會(huì)影響到另外一列火車(chē),但是如果一列火車(chē)上中間的一節(jié)車(chē)廂著火了,將影響到該趟火車(chē)的所有車(chē)廂)

進(jìn)程可以拓展到多機(jī),進(jìn)程最多適合多核(不同火車(chē)可以開(kāi)在多個(gè)軌道上,同一火車(chē)的車(chē)廂不能在行進(jìn)的不同的軌道上)

進(jìn)程使用的內(nèi)存地址可以上鎖,即一個(gè)線程使用某些共享內(nèi)存時(shí),其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。(比如火車(chē)上的洗手間)-”互斥鎖(mutex)”

進(jìn)程使用的內(nèi)存地址可以限定使用量(比如火車(chē)上的餐廳,最多只允許多少人進(jìn)入,如果滿了需要在門(mén)口等,等有人出來(lái)了才能進(jìn)去)-“信號(hào)量(semaphore)”

Python全局解釋器鎖GIL

全局解釋器鎖(英語(yǔ):Global Interpreter Lock,縮寫(xiě)GIL),并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。由于CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語(yǔ)言的缺陷。那么CPython實(shí)現(xiàn)中的GIL又是什么呢?來(lái)看看官方的解釋:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Python代碼的執(zhí)行由Python 虛擬機(jī)(也叫解釋器主循環(huán),CPython版本)來(lái)控制,Python 在設(shè)計(jì)之初就考慮到要在解釋器的主循環(huán)中,同時(shí)只有一個(gè)線程在執(zhí)行,即在任意時(shí)刻,只有一個(gè)線程在解釋器中運(yùn)行。對(duì)Python 虛擬機(jī)的訪問(wèn)由全局解釋器鎖(GIL)來(lái)控制,正是這個(gè)鎖能保證同一時(shí)刻只有一個(gè)線程在運(yùn)行。

7036224c-7b16-11ee-939d-92fbcf53809c.png

GIL 有什么好處?簡(jiǎn)單來(lái)說(shuō),它在單線程的情況更快,并且在和 C 庫(kù)結(jié)合時(shí)更方便,而且不用考慮線程安全問(wèn)題,這也是早期 Python 最常見(jiàn)的應(yīng)用場(chǎng)景和優(yōu)勢(shì)。另外,GIL的設(shè)計(jì)簡(jiǎn)化了CPython的實(shí)現(xiàn),使得對(duì)象模型,包括關(guān)鍵的內(nèi)建類型如字典,都是隱含可以并發(fā)訪問(wèn)的。鎖住全局解釋器使得比較容易的實(shí)現(xiàn)對(duì)多線程的支持,但也損失了多處理器主機(jī)的并行計(jì)算能力。

在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

設(shè)置GIL

切換到一個(gè)線程去運(yùn)行

運(yùn)行直至指定數(shù)量的字節(jié)碼指令,或者線程主動(dòng)讓出控制(可以調(diào)用sleep(0))

把線程設(shè)置為睡眠狀態(tài)

解鎖GIL

再次重復(fù)以上所有步驟

7049d6de-7b16-11ee-939d-92fbcf53809c.png

Python3.2前,GIL的釋放邏輯是當(dāng)前線程遇見(jiàn)IO操作或者ticks計(jì)數(shù)達(dá)到100(ticks可以看作是python自身的一個(gè)計(jì)數(shù)器,專門(mén)做用于GIL,每次釋放后歸零,這個(gè)計(jì)數(shù)可以通過(guò) sys.setcheckinterval 來(lái)調(diào)整),進(jìn)行釋放。因?yàn)橛?jì)算密集型線程在釋放GIL之后又會(huì)立即去申請(qǐng)GIL,并且通常在其它線程還沒(méi)有調(diào)度完之前它就已經(jīng)重新獲取到了GIL,就會(huì)導(dǎo)致一旦計(jì)算密集型線程獲得了GIL,那么它在很長(zhǎng)一段時(shí)間內(nèi)都將占據(jù)GIL,甚至一直到該線程執(zhí)行結(jié)束。 Python 3.2開(kāi)始使用新的GIL。新的GIL實(shí)現(xiàn)中用一個(gè)固定的超時(shí)時(shí)間來(lái)指示當(dāng)前的線程放棄全局鎖。在當(dāng)前線程保持這個(gè)鎖,且其他線程請(qǐng)求這個(gè)鎖時(shí),當(dāng)前線程就會(huì)在5毫秒后被強(qiáng)制釋放該鎖。該改進(jìn)在單核的情況下,對(duì)于單個(gè)線程長(zhǎng)期占用GIL的情況有所好轉(zhuǎn)。 在單核CPU上,數(shù)百次的間隔檢查才會(huì)導(dǎo)致一次線程切換。在多核CPU上,存在嚴(yán)重的線程顛簸(thrashing)。而每次釋放GIL鎖,線程進(jìn)行鎖競(jìng)爭(zhēng)、切換線程,會(huì)消耗資源。單核下多線程,每次釋放GIL,喚醒的那個(gè)線程都能獲取到GIL鎖,所以能夠無(wú)縫執(zhí)行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會(huì)進(jìn)行競(jìng)爭(zhēng),但GIL可能會(huì)馬上又被CPU0拿到,導(dǎo)致其他幾個(gè)CPU上被喚醒后的線程會(huì)醒著等待到切換時(shí)間后又進(jìn)入待調(diào)度狀態(tài),這樣會(huì)造成線程顛簸(thrashing),導(dǎo)致效率更低。 另外,從上面的實(shí)現(xiàn)機(jī)制可以推導(dǎo)出,Python的多線程對(duì)IO密集型代碼要比CPU密集型代碼更加友好。 針對(duì)GIL的應(yīng)對(duì)措施:

使用更高版本Python(對(duì)GIL機(jī)制進(jìn)行了優(yōu)化)

使用多進(jìn)程替換多線程(多進(jìn)程之間沒(méi)有GIL,但是進(jìn)程本身的資源消耗較多)

指定cpu運(yùn)行線程(使用affinity模塊)

使用Jython、IronPython等無(wú)GIL解釋器

全I(xiàn)O密集型任務(wù)時(shí)才使用多線程

使用協(xié)程(高效的單線程模式,也稱微線程;通常與多進(jìn)程配合使用)

將關(guān)鍵組件用C/C++編寫(xiě)為Python擴(kuò)展,通過(guò)ctypes使Python程序直接調(diào)用C語(yǔ)言編譯的動(dòng)態(tài)鏈接庫(kù)的導(dǎo)出函數(shù)。(with nogil調(diào)出GIL限制)

Python的多進(jìn)程包multiprocessing

Python的threading包主要運(yùn)用多線程的開(kāi)發(fā),但由于GIL的存在,Python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,大部分情況需要使用多進(jìn)程。在Python 2.6版本的時(shí)候引入了multiprocessing包,它完整的復(fù)制了一套threading所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭(zhēng)搶。 借助這個(gè)multiprocessing,你可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

Multiprocessing產(chǎn)生的背景

除了應(yīng)對(duì)Python的GIL以外,產(chǎn)生multiprocessing的另外一個(gè)原因時(shí)Windows操作系統(tǒng)與Linux/Unix系統(tǒng)的不一致。 Unix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù),調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(父進(jìn)程)復(fù)制了一份(子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getpid()就可以拿到父進(jìn)程的ID。 Python的os模塊封裝了常見(jiàn)的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:

importos

print('Process(%s)start...'%os.getpid())

#OnlyworksonUnix/Linux/Mac:

pid=os.fork()

ifpid==0:

print('Iamchildprocess(%s)andmyparentis%s.'%(os.getpid(),os.getppid()))

else:

print('I(%s)justcreatedachildprocess(%s).'%(os.getpid(),pid))

上述代碼在Linux、Unix和Mac上的執(zhí)行結(jié)果為:

Process(876)start...

I(876)justcreatedachildprocess(877).

Iamchildprocess(877)andmyparentis876.
有了fork調(diào)用,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來(lái)處理新任務(wù),常見(jiàn)的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽(tīng)端口,每當(dāng)有新的http請(qǐng)求時(shí),就fork出子進(jìn)程來(lái)處理新的http請(qǐng)求。 由于Windows沒(méi)有fork調(diào)用,上面的代碼在Windows上無(wú)法運(yùn)行。由于Python是跨平臺(tái)的,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒(méi)有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果。

multiprocessing常用組件及功能

7057ecce-7b16-11ee-939d-92fbcf53809c.jpg

創(chuàng)建管理進(jìn)程模塊:

Process(用于創(chuàng)建進(jìn)程)

Pool(用于創(chuàng)建管理進(jìn)程池)

Queue(用于進(jìn)程通信,資源共享)

Value,Array(用于進(jìn)程通信,資源共享)

Pipe(用于管道通信)

Manager(用于資源共享)

同步子進(jìn)程模塊:

Condition(條件變量)

Event(事件)

Lock(互斥鎖)

RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)

Semaphore(信號(hào)量)

接下來(lái)就一起來(lái)學(xué)習(xí)下每個(gè)組件及功能的具體使用方法。

Process(用于創(chuàng)建進(jìn)程)

multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象。

在multiprocessing中,每一個(gè)進(jìn)程都用一個(gè)Process類來(lái)表示。

構(gòu)造方法:Process([group [, target [, name [, args [, kwargs]]]]])

group:分組,實(shí)際上不使用,值始終為None

target:表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù),你可以傳入方法名

name:為子進(jìn)程設(shè)定名稱

args:要傳給target函數(shù)的位置參數(shù),以元組方式進(jìn)行傳入。

kwargs:要傳給target函數(shù)的字典參數(shù),以字典方式進(jìn)行傳入。

實(shí)例方法:

start():?jiǎn)?dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()

run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法

terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖

is_alive():返回進(jìn)程是否在運(yùn)行。如果p仍然運(yùn)行,返回True

join([timeout]):進(jìn)程同步,主進(jìn)程等待子進(jìn)程完成后再執(zhí)行后面的代碼。線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間,父線程不再等待子線程,繼續(xù)往下執(zhí)行),需要強(qiáng)調(diào)的是,p.join只能join住start開(kāi)啟的進(jìn)程,而不能join住run開(kāi)啟的進(jìn)程

屬性介紹:

daemon:默認(rèn)值為False,如果設(shè)為T(mén)rue,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程;當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為T(mén)rue后,p不能創(chuàng)建自己的新進(jìn)程;必須在p.start()之前設(shè)置

name:進(jìn)程的名稱

pid:進(jìn)程的pid

exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可)

authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)

使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)

frommultiprocessingimportProcess

importos

defrun_proc(name):

print('Runchildprocess%s(%s)...'%(name,os.getpid()))

if__name__=='__main__':

print('Parentprocess%s.'%os.getpid())

p=Process(target=run_proc,args=('test',))

print('Childprocesswillstart.')

p.start()

p.join()

print('Childprocessend.')

Pool(用于創(chuàng)建管理進(jìn)程池)

7062dfda-7b16-11ee-939d-92fbcf53809c.png

Pool類用于需要執(zhí)行的目標(biāo)很多,而手動(dòng)限制進(jìn)程數(shù)量又太繁瑣時(shí),如果目標(biāo)少且不用控制進(jìn)程數(shù)量則可以用Process類。Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒(méi)有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。 構(gòu)造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()返回的數(shù)量。

initializer:每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None。如果initializer是None,那么每一個(gè)工作進(jìn)程在開(kāi)始的時(shí)候會(huì)調(diào)用initializer(*initargs)。

initargs:是要傳給initializer的參數(shù)組。

maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)新的工作進(jìn)程來(lái)替代原進(jìn)程,來(lái)讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。

context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用Pool() 或者一個(gè)context對(duì)象的Pool()方法來(lái)創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。

實(shí)例方法:

apply(func[, args[, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過(guò)不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()。它是阻塞的。apply很少使用

apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。它是非阻塞。

map(func, iterable[, chunksize=None]):Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。

map_async(func, iterable[, chunksize=None]):map_async與map的關(guān)系同apply與apply_async

imap():imap 與 map的區(qū)別是,map是當(dāng)所有的進(jìn)程都已經(jīng)執(zhí)行完了,并將結(jié)果返回了,imap()則是立即返回一個(gè)iterable可迭代對(duì)象。

imap_unordered():不保證返回的結(jié)果順序與進(jìn)程添加的順序一致。

close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。

join():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用,讓其不再接受新的Process。

terminate():結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)。

方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法:

get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒(méi)有到達(dá),將引發(fā)異常。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。

ready():如果調(diào)用完成,返回True

successful():如果調(diào)用完成且沒(méi)有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常

wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/p>

terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)

使用示例:

#-*-coding:utf-8-*-

#Pool+map

frommultiprocessing importPool

deftest(i):

print(i)

if__name__=="__main__":

lists=range(100)

pool=Pool(8)

pool.map(test,lists)

pool.close()

pool.join()
#-*-coding:utf-8-*-

#異步進(jìn)程池(非阻塞)

frommultiprocessingimportPool

deftest(i):

print(i)

if__name__=="__main__":

pool=Pool(8)

foriinrange(100):

'''

 For循環(huán)中執(zhí)行步驟:

(1)循環(huán)遍歷,將100個(gè)子進(jìn)程添加到進(jìn)程池(相對(duì)父進(jìn)程會(huì)阻塞)

(2)每次執(zhí)行8個(gè)子進(jìn)程,等一個(gè)子進(jìn)程執(zhí)行完后,立馬啟動(dòng)新的子進(jìn)程。(相對(duì)父進(jìn)程不阻塞)

 apply_async為異步進(jìn)程池寫(xiě)法。異步指的是啟動(dòng)子進(jìn)程的過(guò)程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過(guò)程,與父進(jìn)程本身的執(zhí)行卻是同步的。

'''

pool.apply_async(test,args=(i,))#維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

print("test")

pool.close()

pool.join()
#-*-coding:utf-8-*-

#異步進(jìn)程池(非阻塞)

frommultiprocessingimportPool

deftest(i):

print(i)

if__name__=="__main__":

pool=Pool(8)

foriinrange(100):

'''

實(shí)際測(cè)試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:

(1)遍歷100個(gè)可迭代對(duì)象,往進(jìn)程池放一個(gè)子進(jìn)程

(2)執(zhí)行這個(gè)子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個(gè)子進(jìn)程,再執(zhí)行。(同時(shí)只執(zhí)行一個(gè)子進(jìn)程)

 for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。

'''

pool.apply(test,args=(i,))#維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

print("test")

pool.close()

pool.join()

Queue(用于進(jìn)程通信,資源共享)

在使用多進(jìn)程的過(guò)程中,最好不要使用共享資源。普通的全局變量是不能被子進(jìn)程所共享的,只有通過(guò)Multiprocessing組件構(gòu)造的數(shù)據(jù)結(jié)構(gòu)可以被共享。 Queue是用來(lái)創(chuàng)建進(jìn)程間資源共享的隊(duì)列的類,使用Queue可以達(dá)到多進(jìn)程間數(shù)據(jù)傳遞的功能(缺點(diǎn):只適用Process類,不能在Pool進(jìn)程池中使用)。 構(gòu)造方法:Queue([maxsize])

maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制。

實(shí)例方法:

put():用以插入數(shù)據(jù)到隊(duì)列。put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為T(mén)rue(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。

get():可以從隊(duì)列讀取并且刪除一個(gè)元素。get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為T(mén)rue(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒(méi)有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。若不希望在empty的時(shí)候拋出異常,令blocked為T(mén)rue或者參數(shù)全部置空即可。

get_nowait():同q.get(False)

put_nowait():同q.put(False)

empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中又加入了項(xiàng)目。

full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中的項(xiàng)目被取走。

qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣

使用示例:

frommultiprocessingimportProcess,Queue

importos,time,random

defwrite(q):

print('Processtowrite:%s'%os.getpid())

forvaluein['A','B','C']:

print('Put%stoqueue...'%value)

q.put(value)

time.sleep(random.random())

defread(q):

print('Processtoread:%s'%os.getpid())

whileTrue:

value=q.get(True)

print('Get%sfromqueue.'%value)

if__name__=="__main__":

q=Queue()

pw=Process(target=write,args=(q,))

pr=Process(target=read,args=(q,))

pw.start()

pr.start()

pw.join()#等待pw結(jié)束

pr.terminate()#pr進(jìn)程里是死循環(huán),無(wú)法等待其結(jié)束,只能強(qiáng)行終止
JoinableQueue就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的。 構(gòu)造方法:JoinableQueue([maxsize])

maxsize:隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制。

實(shí)例方法 JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:

task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常

join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止

使用示例:

#-*-coding:utf-8-*-

frommultiprocessingimportProcess,JoinableQueue

importtime,random

defconsumer(q):

whileTrue:

res=q.get()

print('消費(fèi)者拿到了%s'%res)

q.task_done()

defproducer(seq,q):

foriteminseq:

time.sleep(random.randrange(1,2))

q.put(item)

print('生產(chǎn)者做好了%s'%item)

q.join()

if__name__=="__main__":

q=JoinableQueue()

seq=('產(chǎn)品%s'%iforiinrange(5))

p=Process(target=consumer,args=(q,))

p.daemon=True#設(shè)置為守護(hù)進(jìn)程,在主線程停止時(shí)p也停止,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素

p.start()

producer(seq,q)

print('主線程')

Value,Array(用于進(jìn)程通信,資源共享)

multiprocessing 中Value和Array的實(shí)現(xiàn)原理都是在共享內(nèi)存中創(chuàng)建ctypes()對(duì)象來(lái)達(dá)到共享數(shù)據(jù)的目的,兩者實(shí)現(xiàn)方法大同小異,只是選用不同的ctypes數(shù)據(jù)類型而已。 Value 構(gòu)造方法:Value((typecode_or_type, args[, lock])

typecode_or_type:定義ctypes()對(duì)象的類型,可以傳Type code或 C Type,具體對(duì)照表見(jiàn)下文。

args:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

lock:默認(rèn)為T(mén)rue,創(chuàng)建一個(gè)互斥鎖來(lái)限制對(duì)Value對(duì)象的訪問(wèn),如果傳入一個(gè)鎖,如Lock或RLock的實(shí)例,將用于同步。如果傳入False,Value的實(shí)例就不會(huì)被鎖保護(hù),它將不是進(jìn)程安全的。

typecode_or_type支持的類型:

|Typecode|CType|PythonType|Minimumsizeinbytes|

|---------|------------------|-----------------|---------------------|

|`'b'`|signedchar|int|1|

|`'B'`|unsignedchar|int|1|

|`'u'`|Py_UNICODE|Unicodecharacter|2|

|`'h'`|signedshort|int|2|

|`'H'`|unsignedshort|int|2|

|`'i'`|signedint|int|2|

|`'I'`|unsignedint|int|2|

|`'l'`|signedlong|int|4|

|`'L'`|unsignedlong|int|4|

|`'q'`|signedlonglong|int|8|

|`'Q'`|unsignedlonglong|int|8|

|`'f'`|float|float|4|

|`'d'`|double|float|8|

參考地址:https://docs.python.org/3/library/array.html

Array

構(gòu)造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

typecode_or_type:同上

size_or_initializer:如果它是一個(gè)整數(shù),那么它確定數(shù)組的長(zhǎng)度,并且數(shù)組將被初始化為零。否則,size_or_initializer是用于初始化數(shù)組的序列,其長(zhǎng)度決定數(shù)組的長(zhǎng)度。

kwds:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

lock:同上

使用示例:

importmultiprocessing

deff(n,a):

n.value=3.14

a[0]=5

if__name__=='__main__':

num=multiprocessing.Value('d',0.0)

arr=multiprocessing.Array('i',range(10))

p=multiprocessing.Process(target=f,args=(num,arr))

p.start()

p.join()

print(num.value)

print(arr[:])

注意:Value和Array只適用于Process類。

Pipe(用于管道通信)

多進(jìn)程還有一種數(shù)據(jù)傳遞方式叫管道原理和 Queue相同。Pipe可以在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道。 構(gòu)造方法:Pipe([duplex])

dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。

實(shí)例方法:

send(obj):通過(guò)連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象

recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒(méi)有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。

close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法

fileno():返回連接使用的整數(shù)文件描述符

poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無(wú)限期地等待數(shù)據(jù)到達(dá)。

recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過(guò)了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。

send_bytes(buffer [, offset [, size]]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收

recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫(xiě)入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。

使用示例:

frommultiprocessingimportProcess,Pipe

importtime

#子進(jìn)程執(zhí)行方法

deff(Subconn):

time.sleep(1)

Subconn.send("吃了嗎")

print("來(lái)自父親的問(wèn)候:",Subconn.recv())

Subconn.close()

if__name__=="__main__":

parent_conn,child_conn=Pipe()#創(chuàng)建管道兩端

p=Process(target=f,args=(child_conn,))#創(chuàng)建子進(jìn)程

p.start()

print("來(lái)自兒子的問(wèn)候:",parent_conn.recv())

parent_conn.send("嗯")

Manager(用于資源共享)

Manager()返回的manager對(duì)象控制了一個(gè)server進(jìn)程,此進(jìn)程包含的python對(duì)象可以被其他的進(jìn)程通過(guò)proxies來(lái)訪問(wèn)。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全。Manager模塊常與Pool模塊一起使用。 Manager支持的類型有l(wèi)ist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。 管理器是獨(dú)立運(yùn)行的子進(jìn)程,其中存在真實(shí)的對(duì)象,并以服務(wù)器的形式運(yùn)行,其他進(jìn)程通過(guò)使用代理訪問(wèn)共享對(duì)象,這些代理作為客戶端運(yùn)行。Manager()是BaseManager的子類,返回一個(gè)啟動(dòng)的SyncManager()實(shí)例,可用于創(chuàng)建共享對(duì)象并返回訪問(wèn)這些共享對(duì)象的代理。 BaseManager,創(chuàng)建管理器服務(wù)器的基類 構(gòu)造方法:BaseManager([address[, authkey]])

address:(hostname,port),指定服務(wù)器的網(wǎng)址地址,默認(rèn)為簡(jiǎn)單分配一個(gè)空閑的端口

authkey:連接到服務(wù)器的客戶端的身份驗(yàn)證,默認(rèn)為current_process().authkey的值

實(shí)例方法:

start([initializer[, initargs]]):?jiǎn)?dòng)一個(gè)單獨(dú)的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器

get_server():獲取服務(wù)器對(duì)象

connect():連接管理器對(duì)象

shutdown():關(guān)閉管理器對(duì)象,只能在調(diào)用了start()方法之后調(diào)用

實(shí)例屬性:

address:只讀屬性,管理器服務(wù)器正在使用的地址

SyncManager,以下類型均不是進(jìn)程安全的,需要加鎖.. 實(shí)例方法:

Array(self,*args,**kwds)

BoundedSemaphore(self,*args,**kwds)

Condition(self,*args,**kwds)

Event(self,*args,**kwds)

JoinableQueue(self,*args,**kwds)

Lock(self,*args,**kwds)

Namespace(self,*args,**kwds)

Pool(self,*args,**kwds)

Queue(self,*args,**kwds)

RLock(self,*args,**kwds)

Semaphore(self,*args,**kwds)

Value(self,*args,**kwds)

dict(self,*args,**kwds)

list(self,*args,**kwds)

使用示例:

importmultiprocessing

deff(x,arr,l,d,n):

x.value=3.14

arr[0]=5

l.append('Hello')

d[1]=2

n.a=10

if__name__=='__main__':

server=multiprocessing.Manager()

x=server.Value('d',0.0)

arr=server.Array('i',range(10))

l=server.list()

d=server.dict()

n=server.Namespace()

proc=multiprocessing.Process(target=f,args=(x,arr,l,d,n))

proc.start()

proc.join()

print(x.value)

print(arr)

print(l)

print(d)

print(n)

同步子進(jìn)程模塊

Lock(互斥鎖)

Lock鎖的作用是當(dāng)多個(gè)進(jìn)程需要訪問(wèn)共享資源的時(shí)候,避免訪問(wèn)的沖突。加鎖保證了多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)修改,即串行的修改,犧牲了速度但保證了數(shù)據(jù)安全。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個(gè)基本的方法。

構(gòu)造方法:Lock()

實(shí)例方法:

acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。

release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

frommultiprocessingimportProcess,Lock

defl(lock,num):

lock.acquire()

print("HelloNum:%s"%(num))

lock.release()

if__name__=='__main__':

lock=Lock()#這個(gè)一定要定義為全局

fornuminrange(20):

Process(target=l,args=(lock,num)).start()

RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)

RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)??梢哉J(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。

構(gòu)造方法:RLock()

實(shí)例方法:

acquire([timeout]):同Lock

release(): 同Lock

Semaphore(信號(hào)量)

信號(hào)量是一個(gè)更高級(jí)的鎖機(jī)制。信號(hào)量?jī)?nèi)部有一個(gè)計(jì)數(shù)器而不像鎖對(duì)象內(nèi)部有鎖標(biāo)識(shí),而且只有當(dāng)占用信號(hào)量的線程數(shù)超過(guò)信號(hào)量時(shí)線程才阻塞。這允許了多個(gè)線程可以同時(shí)訪問(wèn)相同的代碼區(qū)。比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所,后面的人只能等里面有人出來(lái)了才能再進(jìn)去,如果指定信號(hào)量為3,那么來(lái)一個(gè)人獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于3時(shí),后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

構(gòu)造方法:Semaphore([value])

value:設(shè)定信號(hào)量,默認(rèn)值為1

實(shí)例方法:

acquire([timeout]):同Lock

release(): 同Lock

使用示例:

frommultiprocessingimportProcess,Semaphore

importtime,random

defgo_wc(sem,user):

sem.acquire()

print('%s占到一個(gè)茅坑'%user)

time.sleep(random.randint(0,3))

sem.release()

print(user,'OK')

if__name__=='__main__':

sem=Semaphore(2)

p_l=[]

foriinrange(5):

p=Process(target=go_wc,args=(sem,'user%s'%i,))

p.start()

p_l.append(p)

foriinp_l:

i.join()

Condition(條件變量)

可以把Condition理解為一把高級(jí)的鎖,它提供了比Lock, RLock更高級(jí)的功能,允許我們能夠控制復(fù)雜的線程同步問(wèn)題。Condition在內(nèi)部維護(hù)一個(gè)鎖對(duì)象(默認(rèn)是RLock),可以在創(chuàng)建Condigtion對(duì)象的時(shí)候把瑣對(duì)象作為參數(shù)傳入。Condition也提供了acquire, release方法,其含義與鎖的acquire, release方法一致,其實(shí)它只是簡(jiǎn)單的調(diào)用內(nèi)部鎖對(duì)象的對(duì)應(yīng)的方法而已。Condition還提供了其他的一些方法。

構(gòu)造方法:Condition([lock/rlock])

可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。

實(shí)例方法:

acquire([timeout]):首先進(jìn)行acquire,然后判斷一些條件。如果條件不滿足則wait

release():釋放 Lock

wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件。

notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

importmultiprocessing

importtime

defstage_1(cond):

"""performfirststageofwork,

thennotifystage_2tocontinue

"""

name=multiprocessing.current_process().name

print('Starting',name)

withcond:

print('{}doneandreadyforstage2'.format(name))

cond.notify_all()

defstage_2(cond):

"""waitfortheconditiontellingusstage_1isdone"""

name=multiprocessing.current_process().name

print('Starting',name)

withcond:

cond.wait()

print('{}running'.format(name))

if__name__=='__main__':

condition=multiprocessing.Condition()

s1=multiprocessing.Process(name='s1',

target=stage_1,

args=(condition,))

s2_clients=[

multiprocessing.Process(

name='stage_2[{}]'.format(i),

target=stage_2,

args=(condition,),

)

foriinrange(1,3)

]

forcins2_clients:

c.start()

time.sleep(1)

s1.start()

s1.join()

forcins2_clients:

c.join()

Event(事件)

Event內(nèi)部包含了一個(gè)標(biāo)志位,初始的時(shí)候?yàn)閒alse??梢允褂胹et()來(lái)將其設(shè)置為true;或者使用clear()將其從新設(shè)置為false;可以使用is_set()來(lái)檢查標(biāo)志位的狀態(tài);另一個(gè)最重要的函數(shù)就是wait(timeout=None),用來(lái)阻塞當(dāng)前線程,直到event的內(nèi)部標(biāo)志位被設(shè)置為true或者timeout超時(shí)。如果內(nèi)部標(biāo)志位為true則wait()函數(shù)理解返回。

使用示例:

importmultiprocessing

importtime

defwait_for_event(e):

"""Waitfortheeventtobesetbeforedoinganything"""

print('wait_for_event:starting')

e.wait()

print('wait_for_event:e.is_set()->',e.is_set())

defwait_for_event_timeout(e,t):

"""Waittsecondsandthentimeout"""

print('wait_for_event_timeout:starting')

e.wait(t)

print('wait_for_event_timeout:e.is_set()->',e.is_set())

if__name__=='__main__':

e=multiprocessing.Event()

w1=multiprocessing.Process(

name='block',

target=wait_for_event,

args=(e,),

)

w1.start()

w2=multiprocessing.Process(

name='nonblock',

target=wait_for_event_timeout,

args=(e,2),

)

w2.start()

print('main:waitingbeforecallingEvent.set()')

time.sleep(3)

e.set()

print('main:eventisset')

其他內(nèi)容

multiprocessing.dummy 模塊與 multiprocessing 模塊的區(qū)別:dummy 模塊是多線程,而 multiprocessing 是多進(jìn)程, api 都是通用的。所有可以很方便將代碼在多線程和多進(jìn)程之間切換。multiprocessing.dummy通常在IO場(chǎng)景可以嘗試使用,比如使用如下方式引入線程池。

frommultiprocessing.dummyimportPoolasThreadPool

multiprocessing.dummy與早期的threading,不同的點(diǎn)好像是在多多核CPU下,只綁定了一個(gè)核心(具體未考證)。

參考文檔:

https://docs.python.org/3/library/multiprocessing.html

https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/

Python并發(fā)之concurrent.futures

Python標(biāo)準(zhǔn)庫(kù)為我們提供了threading和multiprocessing模塊編寫(xiě)相應(yīng)的多線程/多進(jìn)程代碼。從Python3.2開(kāi)始,標(biāo)準(zhǔn)庫(kù)為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個(gè)類,實(shí)現(xiàn)了對(duì)threading和multiprocessing的更高級(jí)的抽象,對(duì)編寫(xiě)線程池/進(jìn)程池提供了直接的支持。concurrent.futures基礎(chǔ)模塊是executor和future。

Executor

Executor是一個(gè)抽象類,它不能被直接使用。它為具體的異步執(zhí)行定義了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來(lái)創(chuàng)建線程池和進(jìn)程池的代碼。

ThreadPoolExecutor對(duì)象

ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用。

classconcurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers數(shù)目的線程池執(zhí)行異步調(diào)用。

ProcessPoolExecutor對(duì)象

ThreadPoolExecutor類是Executor子類,使用進(jìn)程池執(zhí)行異步調(diào)用。

classconcurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers數(shù)目的進(jìn)程池執(zhí)行異步調(diào)用,如果max_workers為None則使用機(jī)器的處理器數(shù)目(如4核機(jī)器max_worker配置為None時(shí),則使用4個(gè)進(jìn)程進(jìn)行異步并發(fā))。

submit()方法

Executor中定義了submit()方法,這個(gè)方法的作用是提交一個(gè)可執(zhí)行的回調(diào)task,并返回一個(gè)future實(shí)例。future對(duì)象代表的就是給定的調(diào)用。

Executor.submit(fn, *args, **kwargs)

fn:需要異步執(zhí)行的函數(shù)

*args, **kwargs:fn參數(shù)

使用示例:

fromconcurrentimportfutures

deftest(num):

importtime

returntime.ctime(),num

withfutures.ThreadPoolExecutor(max_workers=1)asexecutor:

future=executor.submit(test,1)

print(future.result())

map()方法

除了submit,Exectuor還為我們提供了map方法,這個(gè)方法返回一個(gè)map(func, *iterables)迭代器,迭代器中的回調(diào)執(zhí)行返回的結(jié)果有序的。

Executor.map(func, *iterables, timeout=None)

func:需要異步執(zhí)行的函數(shù)

*iterables:可迭代對(duì)象,如列表等。每一次func執(zhí)行,都會(huì)從iterables中取參數(shù)。

timeout:設(shè)置每次異步操作的超時(shí)時(shí)間,timeout的值可以是int或float,如果操作超時(shí),會(huì)返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時(shí)間。

使用示例:

fromconcurrentimportfutures

deftest(num):

importtime

returntime.ctime(),num

data=[1,2,3]

withfutures.ThreadPoolExecutor(max_workers=1)asexecutor:

forfutureinexecutor.map(test,data):

print(future)

shutdown()方法

釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語(yǔ)句可以避免顯式調(diào)用此方法。

Executor.shutdown(wait=True)

Future

Future可以理解為一個(gè)在未來(lái)完成的操作,這是異步編程的基礎(chǔ)。通常情況下,我們執(zhí)行io操作,訪問(wèn)url時(shí)(如下)在等待結(jié)果返回之前會(huì)產(chǎn)生阻塞,cpu不能做其他事情,而Future的引入幫助我們?cè)诘却倪@段時(shí)間可以完成其他的操作。 Future類封裝了可調(diào)用的異步執(zhí)行。Future 實(shí)例通過(guò) Executor.submit()方法創(chuàng)建。

cancel():試圖取消調(diào)用。如果調(diào)用當(dāng)前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調(diào)用將被取消,方法將返回True。

cancelled():如果成功取消調(diào)用,返回True。

running():如果調(diào)用當(dāng)前正在執(zhí)行并且不能被取消,返回True。

done():如果調(diào)用成功地取消或結(jié)束了,返回True。

result(timeout=None):返回調(diào)用返回的值。如果調(diào)用還沒(méi)有完成,那么這個(gè)方法將等待超時(shí)秒。如果調(diào)用在超時(shí)秒內(nèi)沒(méi)有完成,那么就會(huì)有一個(gè)Futures.TimeoutError將報(bào)出。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無(wú)限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。

exception(timeout=None):返回調(diào)用拋出的異常,如果調(diào)用還未完成,該方法會(huì)等待timeout指定的時(shí)長(zhǎng),如果該時(shí)長(zhǎng)后調(diào)用還未完成,就會(huì)報(bào)出超時(shí)錯(cuò)誤futures.TimeoutError。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無(wú)限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。如果調(diào)用完成并且無(wú)異常報(bào)出,返回None.

add_done_callback(fn):將可調(diào)用fn捆綁到future上,當(dāng)Future被取消或者結(jié)束運(yùn)行,fn作為future的唯一參數(shù)將會(huì)被調(diào)用。如果future已經(jīng)運(yùn)行完成或者取消,fn將會(huì)被立即調(diào)用。

wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待fs提供的 Future 實(shí)例(possibly created by different Executor instances) 運(yùn)行結(jié)束。返回一個(gè)命名的2元集合,分表代表已完成的和未完成的

return_when 表明什么時(shí)候函數(shù)應(yīng)該返回。它的值必須是一下值之一:

FIRST_COMPLETED :函數(shù)在任何future結(jié)束或者取消的時(shí)候返回。

FIRST_EXCEPTION :函數(shù)在任何future因?yàn)楫惓=Y(jié)束的時(shí)候返回,如果沒(méi)有future報(bào)錯(cuò),效果等于

ALL_COMPLETED :函數(shù)在所有future結(jié)束后才會(huì)返回。

as_completed(fs, timeout=None):參數(shù)是一個(gè) Future 實(shí)例列表,返回值是一個(gè)迭代器,在運(yùn)行結(jié)束后產(chǎn)出 Future實(shí)例 。

使用示例:

fromconcurrent.futuresimportThreadPoolExecutor,wait,as_completed

fromtimeimportsleep

fromrandomimportrandint

defreturn_after_5_secs(num):

sleep(randint(1,5))

return"Returnof{}".format(num)

pool=ThreadPoolExecutor(5)

futures=[]

forxinrange(5):

futures.append(pool.submit(return_after_5_secs,x))

print(1)

forxinas_completed(futures):

print(x.result())

print(2)

作者:錢(qián)魏Way

編輯:黃飛

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • cpu
    cpu
    +關(guān)注

    關(guān)注

    68

    文章

    11213

    瀏覽量

    222737
  • 計(jì)算機(jī)
    +關(guān)注

    關(guān)注

    19

    文章

    7760

    瀏覽量

    92649
  • python
    +關(guān)注

    關(guān)注

    57

    文章

    4856

    瀏覽量

    89519
  • 進(jìn)程
    +關(guān)注

    關(guān)注

    0

    文章

    208

    瀏覽量

    14475
  • 多進(jìn)程
    +關(guān)注

    關(guān)注

    0

    文章

    14

    瀏覽量

    2763

原文標(biāo)題:詳解Python多線程、多進(jìn)程

文章出處:【微信號(hào):vision263com,微信公眾號(hào):新機(jī)器視覺(jué)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評(píng)論

    相關(guān)推薦
    熱點(diǎn)推薦

    請(qǐng)問(wèn)如何在Python中實(shí)現(xiàn)多線程多進(jìn)程的協(xié)作?

    大家好!我最近在開(kāi)發(fā)一個(gè)Python項(xiàng)目時(shí),需要同時(shí)處理多個(gè)任務(wù),且每個(gè)任務(wù)需要不同的計(jì)算資源。我想通過(guò)多線程多進(jìn)程的組合來(lái)實(shí)現(xiàn)并發(fā),但遇到了一些問(wèn)題。 具體來(lái)說(shuō),我有兩個(gè)任務(wù),一個(gè)是I/O密集型
    發(fā)表于 03-11 06:57

    多線程多進(jìn)程的區(qū)別

    6.你的數(shù)據(jù)庫(kù)一會(huì)又500個(gè)連接數(shù),一會(huì)有10個(gè),你分析一下情況7.udp和tcp的區(qū)別8.多線程多進(jìn)程的區(qū)別9.有一臺(tái)web服務(wù)器,你選擇用多線程還是多進(jìn)程,...
    發(fā)表于 07-19 07:21

    淺談多進(jìn)程多線程的選擇

    魚(yú)還是熊掌:淺談多進(jìn)程多線程的選擇關(guān)于多進(jìn)程多線程,教科書(shū)上最經(jīng)典的一句話是“進(jìn)程是資源分配的
    發(fā)表于 08-24 07:38

    python多線程多進(jìn)程對(duì)比

    電視邊吃飯邊聊天。這就是我們的 多進(jìn)程 才能做的事了。2. 單線程VS多線程VS多進(jìn)程文字總是蒼白無(wú)力的,不如用代碼直接來(lái)測(cè)試一下。開(kāi)始對(duì)比之前,首先定義四種類型的場(chǎng)景 - CPU計(jì)算
    發(fā)表于 03-15 16:42

    LINUX系統(tǒng)下多線程多進(jìn)程性能分析

    采用多進(jìn)程處理多個(gè)任務(wù),會(huì)占用很多系統(tǒng)資源(主要是CPU 和內(nèi)存的使用)。在LINUX 中,則對(duì)這種弊端進(jìn)行了改進(jìn),在用戶態(tài)實(shí)現(xiàn)了多線程處理多任務(wù)。本文系統(tǒng)論述了多線程
    發(fā)表于 08-13 08:31 ?20次下載

    VC-MFC多線程編程詳解

    VC編程中關(guān)于 MFC多線程編程的詳解文檔
    發(fā)表于 09-01 15:01 ?0次下載

    如何選好多線程多進(jìn)程

    關(guān)于多進(jìn)程多線程,教科書(shū)上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”,這句話應(yīng)付考試基本上夠了,但如果在工作
    的頭像 發(fā)表于 05-11 16:16 ?3327次閱讀
    如何選好<b class='flag-5'>多線程</b>和<b class='flag-5'>多進(jìn)程</b>

    多進(jìn)程多線程的深度比較

    嵌入式Linux中文站,關(guān)于多進(jìn)程多線程,教科書(shū)上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”
    發(fā)表于 04-02 14:42 ?739次閱讀

    多進(jìn)程多線程的基本概念

    stack),自己的寄存器環(huán)境(register context),自己的線程本地存儲(chǔ)(thread-local storage)。一個(gè)進(jìn)程可以有很多線程,每條線程并行執(zhí)行不同的任務(wù)
    發(fā)表于 04-02 14:49 ?938次閱讀

    使用Python多進(jìn)程的理由

    Python 是運(yùn)行在解釋器中的語(yǔ)言,查找資料知道, python 中有一個(gè)全局鎖( GI),在使用多進(jìn)程( Threa)的情況下,不能發(fā)揮多核的優(yōu)勢(shì)。而使用多進(jìn)程( Multipro
    的頭像 發(fā)表于 04-04 16:50 ?2005次閱讀
    使用<b class='flag-5'>Python</b><b class='flag-5'>多進(jìn)程</b>的理由

    Python多進(jìn)程學(xué)習(xí)

    Python 多進(jìn)程 (Multiprocessing) 是一種同時(shí)利用計(jì)算機(jī)多個(gè)處理器核心 (CPU cores) 進(jìn)行并行處理的技術(shù),它與 Python多線程 (Multith
    的頭像 發(fā)表于 04-26 11:04 ?979次閱讀

    淺談Linux網(wǎng)絡(luò)編程中的多進(jìn)程多線程

    在Linux網(wǎng)絡(luò)編程中,我們應(yīng)該見(jiàn)過(guò)很多網(wǎng)絡(luò)框架或者server,有多進(jìn)程的處理方式,也有多線程處理方式,孰好孰壞并沒(méi)有可比性,首先選擇多進(jìn)程還是多線程我們需要考慮業(yè)務(wù)場(chǎng)景,其次結(jié)合當(dāng)
    發(fā)表于 08-08 16:56 ?1264次閱讀
    淺談Linux網(wǎng)絡(luò)編程中的<b class='flag-5'>多進(jìn)程</b>和<b class='flag-5'>多線程</b>

    Linux系統(tǒng)上多線程多進(jìn)程的運(yùn)行效率

    關(guān)于多進(jìn)程多線程,教科書(shū)上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”,這句話應(yīng)付考試基本上夠了,但如果在工作
    的頭像 發(fā)表于 11-10 10:54 ?2142次閱讀
    Linux系統(tǒng)上<b class='flag-5'>多線程</b>和<b class='flag-5'>多進(jìn)程</b>的運(yùn)行效率

    你還是分不清多進(jìn)程多線程嗎?一文搞懂!

    你還是分不清多進(jìn)程多線程嗎?一文搞懂! 多進(jìn)程多線程是并發(fā)編程中常見(jiàn)的兩個(gè)概念,它們都可以用于提高程序的性能和效率。但是它們的實(shí)現(xiàn)方式和使用場(chǎng)景略有不同。 1.
    的頭像 發(fā)表于 12-19 16:07 ?1293次閱讀

    Python多線程多進(jìn)程的區(qū)別

    Python作為一種高級(jí)編程語(yǔ)言,提供了多種并發(fā)編程的方式,其中多線程多進(jìn)程是最常見(jiàn)的兩種方式之一。在本文中,我們將探討Python多線程
    的頭像 發(fā)表于 10-23 11:48 ?1345次閱讀
    <b class='flag-5'>Python</b>中<b class='flag-5'>多線程</b>和<b class='flag-5'>多進(jìn)程</b>的區(qū)別