這篇文章將為大家詳細講解有關python如何實現(xiàn)多進程,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
創(chuàng)新互聯(lián)是專業(yè)的右玉網(wǎng)站建設公司,右玉接單;提供做網(wǎng)站、成都網(wǎng)站設計,網(wǎng)頁設計,網(wǎng)站設計,建網(wǎng)站,PHP網(wǎng)站建設等專業(yè)做網(wǎng)站服務;采用PHP框架,可快速的進行右玉網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!
多進程的含義
進程(Process)是具有一定獨立功能的程序關于某個數(shù)據(jù)集合上的一次運行活動,是系統(tǒng)進行資源分配和調(diào)度的一個獨立單位。
顧名思義,多進程就是啟用多個進程同時運行。由于進程是線程的集合,而且進程是由一個或多個線程構(gòu)成的,所以多進程的運行意味著有大于或等于進程數(shù)量的線程在運行。
由于進程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢,一個進程中的多個線程,在同一時刻只能有一個線程運行。
而對于多進程來說,每個進程都有屬于自己的 GIL,所以,在多核處理器下,多進程的運行是不會受 GIL 的影響的。因此,多進程能更好地發(fā)揮多核的優(yōu)勢。
當然,對于爬蟲這種 IO 密集型任務來說,多線程和多進程影響差別并不大。對于計算密集型任務來說,Python 的多進程相比多線程,其多核運行效率會有成倍的提升。
總的來說,Python 的多進程整體來看是比多線程更有優(yōu)勢的。所以,在條件允許的情況下,能用多進程就盡量用多進程。
不過值得注意的是,由于進程是系統(tǒng)進行資源分配和調(diào)度的一個獨立單位,所以各個進程之間的數(shù)據(jù)是無法共享的,如多個進程無法共享一個全局變量,
進程之間的數(shù)據(jù)共享需要有單獨的機制來實現(xiàn)。
在Python中也有內(nèi)置的庫來實現(xiàn)多進程,它就是multiprocessing。multiprocessing提供了一系列的組件,如Process(進程)、Queue(隊列)、
Semaphore(信號量)、Pipe(管道)、Lock(鎖)、Pool(進程池)等。
import multiprocessing import time now = lambda: time.time() def work(index): print(index) time.sleep(index) def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i,)) p.start() print(f"Time: {now() - start_dt}") if __name__ == "__main__": main()
import multiprocessingimport timenow = lambda: time.time()def work(i, index): print(f'{i}進程啟動') time.sleep(index) print(f'{i}進程結(jié)束')def main(): start_dt = now() for i in range(5): p = multiprocessing.Process(target=work, args=(i, 5)) p.start() # 查看本機的 cpu 數(shù)量 print(f"CPU Numbers: {multiprocessing.cpu_count()}") # 查看全部活躍子進程的名稱以及pid for p in multiprocessing.active_children(): print(p.name, p.pid) print(f"Time End: {now() - start_dt}")if __name__ == "__main__": main()
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Pid: {self.pid}, Name: {self.name}") def main(): for i in range(2, 5): p = MyProcess(i) p.start() if __name__ == "__main__": main()
在多進程中,同樣存在守護進程的概念,如果一個進程被設置為守護進程,當父進程結(jié)束后,子進程會自動被終止,我們可以通過設置daemon屬性來控制是否為守護進程。
還是原來的例子,增加了deamon屬性的設置:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。 print("Main End.")
這樣可以有效防止無控制地生成子進程。這樣的寫法可以讓我們在主進程運行結(jié)束后無需額外擔心子進程是否關閉,避免了獨立子進程的運行。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: p.join() if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。 print("Main End.")
默認情況下,join是無限期的。也就是說,如果有子進程沒有運行完畢,主進程會一直等待。這種情況下,如果子進程出現(xiàn)問題陷入了死循環(huán),主進程也會無限等待下去。
怎么解決這個問題呢?可以給 join 方法傳遞一個超時參數(shù),代表最長等待秒數(shù)。如果子進程沒有在這個指定秒數(shù)之內(nèi)完成,會被強制返回,主進程不再會等待。
也就是說這個參數(shù)設置了主進程等待該子進程的最長時間。
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop): super(MyProcess, self).__init__() self.loop = loop def run(self): for count in range(self.loop): time.sleep(1) print(f"Loop: {self.loop}, Pid: {self.pid}, Loop Count: {count}") def main(): processes = [] for i in range(2, 5): p = MyProcess(i) p.daemon = True p.start() processes.append(p) for p in processes: # 主進程最多等待改進程 1 s p.join(1) if __name__ == "__main__": main() # 主進程沒有做任何事情 直接輸入后結(jié)束 同時也終止了子進程的運行。 print("Main End.")
當然,終止進程不止有守護進程這一種做法,我們也可以通過 terminate 方法來終止某個子進程,另外我們還可以通過 is_alive 方法判斷進程是否還在運行。
import multiprocessing import time def task(): print("1") time.sleep(5) print("2") if __name__ == "__main__": p = multiprocessing.Process(target=task) # 使用 is_alive 判斷當前進程進程是否在運行 print(f"First: {p}, {p.is_alive()}") p.start() print(f"During: {p}, {p.is_alive()}") p.terminate() # 即使此時已經(jīng)調(diào)用了 terminate 進程的狀態(tài)還是 True, 即運行狀態(tài) # 在調(diào)用 join 之后才變成了終止狀態(tài) print(f"After: {p}, {p.is_alive()}") p.join() print(f"Joined: {p}, {p.is_alive()}")
我們發(fā)現(xiàn),有的輸出結(jié)果沒有換行。這是什么原因造成的呢?這種情況是由多個進程并行執(zhí)行導致的,兩個進程同時進行了輸出,結(jié)果第一個進程的換行沒有來得及輸出,
第二個進程就輸出了結(jié)果,導致最終輸出沒有換行。
那如何來避免這種問題?如果我們能保證,多個進程運行期間的任一時間,只能一個進程輸出,其他進程等待,等剛才那個進程輸出完畢之后,另一個進程再進行輸出,
這樣就不會出現(xiàn)輸出沒有換行的現(xiàn)象了。
這種解決方案實際上就是實現(xiàn)了進程互斥,避免了多個進程同時搶占臨界區(qū)(輸出)資源。我們可以通過multiprocessing中的Lock來實現(xiàn)。Lock,即鎖,
在一個進程輸出時,加鎖,其他進程等待。等此進程執(zhí)行結(jié)束后,釋放鎖,其他進程可以進行輸出。
首先是一個不加鎖的實例:
import multiprocessing import time class MyProcess(multiprocessing.Process): def __init__(self, loop, lock: multiprocessing.Lock): super(MyProcess, self).__init__() self.loop = loop self.lock = lock def run(self): for count in range(self.loop): time.sleep(0.1) # self.lock.acquire() print(f"Pid: {self.pid}, LoopCount: {count}") # self.lock.release() def main(): lock = multiprocessing.Lock() for i in range(10, 15): p = MyProcess(i, lock) p.start() if __name__ == "__main__": main()
然后取消注釋再次運行。
進程互斥鎖可以使同一時刻只有一個進程能訪問共享資源,如上面的例子所展示的那樣,在同一時刻只能有一個進程輸出結(jié)果。
但有時候我們需要允許多個進程來訪問共享資源,同時還需要限制能訪問共享資源的進程的數(shù)量。
這種需求該如何實現(xiàn)呢?可以用信號量,信號量是進程同步過程中一個比較重要的角色。它可以控制臨界資源的數(shù)量,實現(xiàn)多個進程同時訪問共享資源,限制進程的并發(fā)量。
我們可以用 multiprocessing 庫中的 Semaphore 來實現(xiàn)信號量。
那么接下來我們就用一個實例來演示一下進程之間利用 Semaphore 做到多個進程共享資源,同時又限制同時可訪問的進程數(shù)量,代碼如下:
import multiprocessing import time ''' Semaphore管理一個內(nèi)置的計數(shù)器, 每當調(diào)用acquire()時內(nèi)置計數(shù)器-1; 調(diào)用release() 時內(nèi)置計數(shù)器+1; 計數(shù)器不能小于0;當計數(shù)器為0時,acquire()將阻塞線程直到其他線程(進程)調(diào)用release()。 ''' buffer = multiprocessing.Queue(10) empty = multiprocessing.Semaphore(2) # 緩沖區(qū)閑適區(qū)空余數(shù) full = multiprocessing.Semaphore(0) # 緩沖區(qū)占用區(qū)占用數(shù) lock = multiprocessing.Lock() class Consumer(multiprocessing.Process): def run(self): global buffer, empty, full, lock while True: full.acquire() lock.acquire() buffer.get() print("Consumer pop an element.") time.sleep(1) lock.release() empty.release() class Producer(multiprocessing.Process): def __init__(self, name): super(Producer, self).__init__() self.name = name def run(self): global buffer, empty, full, lock # 生產(chǎn)者 Producer 使用 acquire 方法來占用一個緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來進行加鎖,對緩沖區(qū)進行操作, # 然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費者則相反。 # 通過 Semaphore 我們很好地控制了進程對資源的并發(fā)訪問數(shù)量。 while True: empty.acquire() lock.acquire() buffer.put(1) print(f"{self.name} Producer put an element.") time.sleep(1) lock.release() full.release() def main(): lst = [] for i in range(3): p = Producer(str(i)) p.daemon = True p.start() lst.append(p) c = Consumer() c.daemon = True c.start() c.join() for p in lst: p.join() print("Main End.") if __name__ == "__main__": main()
在上面的例子中我們使用Queue作為進程通信的共享隊列使用。而如果我們把上面程序中的Queue換成普通的list,是完全起不到效果的,因為進程和進程之間的資源是不共享的。
即使在一個進程中改變了這個list,在另一個進程也不能獲取到這個list的狀態(tài),所以聲明全局變量對多進程是沒有用處的。那進程如何共享數(shù)據(jù)呢?可以用Queue,即隊列。
當然這里的隊列指的是 multiprocessing 里面的 Queue。
剛才我們使用Queue實現(xiàn)了進程間的數(shù)據(jù)共享,那么進程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用Pipe,管道。管道,我們可以把它理解為兩個進程之間通信的通道。
管道可以是單向的,即half-duplex:一個進程負責發(fā)消息,另一個進程負責收消息;也可以是雙向的duplex,即互相收發(fā)消息。
默認聲明Pipe對象是雙向管道,如果要創(chuàng)建單向管道,可以在初始化的時候傳入 deplex 參數(shù)為 False。
import multiprocessing class Consumer(multiprocessing.Process): def __init__(self, pipe): super(Consumer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Consumer Words.") print(f"Consumer Recv: {self.pipe.recv()}") class Producer(multiprocessing.Process): def __init__(self, pipe): super(Producer, self).__init__() self.pipe = pipe def run(self): self.pipe.send("Producer Words.") print(f"Producer Recv: {self.pipe.recv()}") def main(): # 聲明了一個默認為雙向的管道,然后將管道的兩端分別傳給兩個進程。 # 管道 Pipe 就像進程之間搭建的橋梁,利用它我們就可以很方便地實現(xiàn)進程間通信了。 pipe = multiprocessing.Pipe() c = Consumer(pipe[0]) p = Producer(pipe[1]) c.daemon = True p.daemon = True c.start() p.start() c.join() p.join() print("Main Process Ended.") if __name__ == "__main__": main()
我們講了可以使用Process來創(chuàng)建進程,同時也講了如何用Semaphore來控制進程的并發(fā)執(zhí)行數(shù)量。假如現(xiàn)在我們遇到這么一個問題,我有10000個任務,
每個任務需要啟動一個進程來執(zhí)行,并且一個進程運行完畢之后要緊接著啟動下一個進程,同時我還需要控制進程的并發(fā)數(shù)量,不能并發(fā)太高,
不然CPU處理不過來(如果同時運行的進程能維持在一個最高恒定值當然利用率是最高的)。那么我們該如何來實現(xiàn)這個需求呢?
用Process和Semaphore可以實現(xiàn),但是實現(xiàn)起來比較煩瑣。而這種需求在平時又是非常常見的。此時,我們就可以派上進程池了,即 multiprocessing 中的 Pool。
Pool可以提供指定數(shù)量的進程,供用戶調(diào)用,當有新的請求提交到pool中時,如果池還沒有滿,就會創(chuàng)建一個新的進程用來執(zhí)行該請求;
但如果池中的進程數(shù)已經(jīng)達到規(guī)定最大值,那么該請求就會等待,直到池中有進程結(jié)束,才會創(chuàng)建新的進程來執(zhí)行它。
進程池1:
import multiprocessing import time def function(index): print(f"{index} start") time.sleep(3) print(f"{index} end") def main(): # 聲明了一個大小為 3 的進程池,通過 processes 參數(shù)來指定,如果不指定,那么會自動根據(jù)處理器內(nèi)核來分配進程數(shù)。 pool = multiprocessing.Pool(processes=3) for i in range(4): # 使用 apply_async 方法將進程添加進去,args 可以用來傳遞參數(shù)。 pool.apply_async(function, args=(i, )) print("Main start.") # 關閉進程池 使之不再接受新任務 pool.close() pool.join() print("Main end.") if __name__ == "__main__": main()
再介紹進程池一個更好用的map方法,可以將上述寫法簡化很多。map方法是怎么用的呢?第一個參數(shù)就是要啟動的進程對應的執(zhí)行方法,
第2個參數(shù)是一個可迭代對象,其中的每個元素會被傳遞給這個執(zhí)行方法。
舉個例子:現(xiàn)在我們有一個list,里面包含了很多URL,另外我們也定義了一個方法用來抓取每個URL內(nèi)容并解析,
那么我們可以直接在map的第一個參數(shù)傳入方法名,第 2 個參數(shù)傳入 URL 數(shù)組。
進程池2:
import multiprocessing import requests def scrape(url): try: ret = requests.get(url).text print(ret[:10]) print() except: print("Get Error") def main(): pool = multiprocessing.Pool(processes=3) urls = [ 'http://data.eastmoney.com/hsgt/index.html', 'http://data.eastmoney.com/hsgtcg/gzcglist.html', 'https://www.runoob.com/MySQL/mysql-alter.html', 'https://blog.csdn.net/weixin_42329277/article/details/80735009?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task', ] pool.map(scrape, urls) pool.close() # pool.join() print("Main End.") if __name__ == "__main__": main()
名稱欄目:python如何實現(xiàn)多進程
URL標題:http://m.rwnh.cn/article18/psgjdp.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、網(wǎng)頁設計公司、云服務器、手機網(wǎng)站建設、外貿(mào)建站、企業(yè)網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)