網(wǎng)站發(fā)布信息技巧世界疫情最新數(shù)據(jù)
深入理解Python多進(jìn)程編程 multiprocessing
flyfish
Python 的 multiprocessing
模塊允許創(chuàng)建多個進(jìn)程,從而可以利用多核處理器的能力來并行執(zhí)行任務(wù)。這意味著程序的不同部分可以在不同的CPU核心上同時運(yùn)行,極大地提高了處理效率,特別是在執(zhí)行計算密集型任務(wù)時。
與多線程相比,multiprocessing
使用的是系統(tǒng)級的進(jìn)程而不是線程。每個進(jìn)程都有獨立的內(nèi)存空間和系統(tǒng)資源,而線程則共享同一個進(jìn)程的內(nèi)存空間。因此,在Python中(特別是由于全局解釋器鎖GIL的存在),對于CPU密集型任務(wù),使用multiprocessing
比多線程能更有效地利用多核CPU的優(yōu)勢。
進(jìn)程的概念
在計算機(jī)操作系統(tǒng)中,進(jìn)程是操作系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。一個進(jìn)程可以包含多個線程。當(dāng)使用multiprocessing
模塊時,可以創(chuàng)建新的進(jìn)程,這些新進(jìn)程將與主程序并行運(yùn)行,并且它們各自擁有獨立的內(nèi)存空間。
示例代碼1:單個進(jìn)程打印數(shù)字
下面是一個簡單的示例,演示如何使用multiprocessing
模塊創(chuàng)建一個進(jìn)程來打印從1到5的數(shù)字:
import multiprocessing
import timedef print_numbers():"""打印從1到5的數(shù)字"""for i in range(1, 6):print("數(shù)字:", i)time.sleep(1) # 模擬耗時操作if __name__ == "__main__":# 創(chuàng)建一個新的進(jìn)程process = multiprocessing.Process(target=print_numbers)# 啟動進(jìn)程process.start()# 等待進(jìn)程完成process.join()
數(shù)字: 1
數(shù)字: 2
數(shù)字: 3
數(shù)字: 4
數(shù)字: 5
multiprocessing.Process()
:創(chuàng)建一個新的進(jìn)程對象。target=print_numbers
:指定該進(jìn)程的目標(biāo)函數(shù)為print_numbers
。process.start()
:啟動進(jìn)程。process.join()
:等待進(jìn)程結(jié)束。
示例代碼2:兩個進(jìn)程分別打印不同字符串
下面是另一個示例,演示如何同時啟動兩個進(jìn)程,每個進(jìn)程打印不同的字符串:
import multiprocessingdef print_message(message):"""打印傳入的消息"""print(f"消息: {message}")if __name__ == "__main__":# 創(chuàng)建兩個進(jìn)程process1 = multiprocessing.Process(target=print_message, args=("Hello from Process 1",))process2 = multiprocessing.Process(target=print_message, args=("Hello from Process 2",))# 啟動兩個進(jìn)程process1.start()process2.start()# 等待兩個進(jìn)程都完成process1.join()process2.join()
消息: Hello from Process 1
消息: Hello from Process 2
在這個例子中,定義了一個print_message
函數(shù),它接受一個字符串參數(shù)并打印出來。然后,創(chuàng)建了兩個進(jìn)程,每個進(jìn)程都調(diào)用這個函數(shù),但傳遞了不同的字符串參數(shù)。通過args
參數(shù),可以向目標(biāo)函數(shù)傳遞額外的參數(shù)。最后,啟動這兩個進(jìn)程,并等待它們完成各自的執(zhí)行。這樣,就可以看到兩個進(jìn)程幾乎同時開始工作,并打印出各自的消息。
示例3:使用 multiprocessing.Value
在多個進(jìn)程中共享一個計數(shù)器
multiprocessing.Value
Value
允許多個進(jìn)程共享一個值。它適用于需要在多個進(jìn)程中共享簡單數(shù)據(jù)類型(如整數(shù)或浮點數(shù))的情況。
import multiprocessingdef increment(counter, lock):"""增加計數(shù)器的值"""for _ in range(1000):with lock:counter.value += 1if __name__ == "__main__":# 創(chuàng)建一個共享的整數(shù)值和鎖counter = multiprocessing.Value('i', 0) # 'i' 表示整數(shù)類型lock = multiprocessing.Lock()# 創(chuàng)建多個進(jìn)程來增加計數(shù)器processes = [multiprocessing.Process(target=increment, args=(counter, lock)) for _ in range(10)]# 啟動所有進(jìn)程for p in processes:p.start()# 等待所有進(jìn)程完成for p in processes:p.join()print("最終計數(shù)器值:", counter.value)
最終計數(shù)器值: 10000
multiprocessing.Value(typecode_or_type, *args, lock=True)
:創(chuàng)建一個新的共享值對象。typecode_or_type
指定了要共享的數(shù)據(jù)類型(例如'i'
表示整數(shù))。value.value
:訪問共享值的實際內(nèi)容。lock
:確保對共享資源的安全訪問,防止競態(tài)條件。
進(jìn)程(Process)和線程(Thread)在Python中的區(qū)別
特性 | 進(jìn)程(Process) | 線程(Thread) |
---|---|---|
內(nèi)存空間 | 每個進(jìn)程有獨立的內(nèi)存空間 | 所有線程共享同一進(jìn)程的內(nèi)存空間 |
資源消耗 | 開銷較大,需要更多系統(tǒng)資源 | 輕量級,開銷小,資源共享 |
通信難度 | 進(jìn)程間通信復(fù)雜(IPC),如管道、套接字等 | 線程間通信簡單,直接訪問相同變量和數(shù)據(jù)結(jié)構(gòu) |
全局解釋器鎖(GIL) | 不受GIL限制,適合計算密集型任務(wù) | 受GIL限制,對于計算密集型任務(wù)效率提升有限 |
適用場景 | 計算密集型任務(wù),穩(wěn)定性要求高的應(yīng)用 | I/O密集型任務(wù),快速響應(yīng)用戶界面的應(yīng)用 |
崩潰影響 | 一個進(jìn)程崩潰不影響其他進(jìn)程 | 一個線程出錯可能導(dǎo)致整個進(jìn)程崩潰 |
Python中多線程(Thread)和多進(jìn)程(Process)的區(qū)別
特性 | 多線程(Thread) | 多進(jìn)程(Process) |
---|---|---|
內(nèi)存空間 | 所有線程共享同一進(jìn)程的內(nèi)存空間 | 每個進(jìn)程有獨立的內(nèi)存空間 |
資源消耗 | 輕量級,開銷小,資源共享 | 開銷較大,需要更多系統(tǒng)資源 |
通信難度 | 線程間通信簡單,直接訪問相同變量和數(shù)據(jù)結(jié)構(gòu) | 進(jìn)程間通信復(fù)雜(IPC),如管道、套接字等 |
全局解釋器鎖 (GIL) | 受GIL限制,對于計算密集型任務(wù)效率提升有限 | 不受GIL限制,適合計算密集型任務(wù) |
適用場景 | I/O密集型任務(wù),快速響應(yīng)用戶界面的應(yīng)用 | 計算密集型任務(wù),穩(wěn)定性要求高的應(yīng)用 |
崩潰影響 | 一個線程出錯可能導(dǎo)致整個進(jìn)程崩潰 | 一個進(jìn)程崩潰不影響其他進(jìn)程 |
創(chuàng)建與銷毀開銷 | 創(chuàng)建和銷毀開銷較小 | 創(chuàng)建和銷毀開銷較大 |
并發(fā)性能 | 對于I/O密集型任務(wù)性能較好,但對于CPU密集型任務(wù)受限 | 對于CPU密集型任務(wù)性能較好 |
示例用途 | 網(wǎng)絡(luò)請求、文件讀寫、GUI應(yīng)用等 | 數(shù)據(jù)分析、圖像處理、科學(xué)計算等 |
進(jìn)程間通信
在Python的multiprocessing
模塊中,提供了幾種常用的進(jìn)程間通信(IPC)方式,包括隊列(Queue)、管道(Pipe)等。這些工具允許不同的進(jìn)程之間安全地傳遞數(shù)據(jù)。
使用 multiprocessing.Queue
實現(xiàn)進(jìn)程間通信
Queue
是一個線程和進(jìn)程安全的 FIFO 隊列,非常適合用于進(jìn)程間的簡單數(shù)據(jù)交換。
示例代碼:
import multiprocessingdef producer(queue):"""生產(chǎn)者函數(shù),向隊列中添加數(shù)據(jù)"""for i in range(5):queue.put(f"數(shù)據(jù) {i}")print(f"生產(chǎn)者放入: 數(shù)據(jù) {i}")def consumer(queue):"""消費者函數(shù),從隊列中取出數(shù)據(jù)"""while not queue.empty():data = queue.get()print(f"消費者獲取: {data}")if __name__ == "__main__":# 創(chuàng)建一個隊列對象queue = multiprocessing.Queue()# 創(chuàng)建生產(chǎn)者和消費者進(jìn)程p1 = multiprocessing.Process(target=producer, args=(queue,))p2 = multiprocessing.Process(target=consumer, args=(queue,))# 啟動進(jìn)程p1.start()p2.start()# 等待兩個進(jìn)程完成p1.join()p2.join()
生產(chǎn)者放入: 數(shù)據(jù) 0
生產(chǎn)者放入: 數(shù)據(jù) 1
生產(chǎn)者放入: 數(shù)據(jù) 2
生產(chǎn)者放入: 數(shù)據(jù) 3
生產(chǎn)者放入: 數(shù)據(jù) 4
消費者獲取: 數(shù)據(jù) 0
消費者獲取: 數(shù)據(jù) 1
消費者獲取: 數(shù)據(jù) 2
消費者獲取: 數(shù)據(jù) 3
消費者獲取: 數(shù)據(jù) 4
- 隊列的使用:
queue.put()
用于向隊列中添加數(shù)據(jù),queue.get()
用于從隊列中取出數(shù)據(jù)。 - 數(shù)據(jù)傳遞原理:生產(chǎn)者進(jìn)程通過調(diào)用
put
方法將數(shù)據(jù)放入隊列,而消費者進(jìn)程通過調(diào)用get
方法從隊列中取出數(shù)據(jù)。Queue
對象是進(jìn)程安全的,因此多個進(jìn)程可以同時訪問它而不發(fā)生沖突。
使用 multiprocessing.Pipe
實現(xiàn)進(jìn)程間通信
Pipe
提供了一個雙向通道,適用于兩個進(jìn)程之間的直接通信。
示例代碼:
import multiprocessingdef sender(conn, messages):"""發(fā)送者函數(shù),通過管道發(fā)送消息"""for msg in messages:conn.send(msg)print(f"發(fā)送者發(fā)送: {msg}")conn.close()def receiver(conn):"""接收者函數(shù),通過管道接收消息"""while True:msg = conn.recv()if msg == "END":breakprint(f"接收者接收: {msg}")if __name__ == "__main__":# 創(chuàng)建一個管道對象parent_conn, child_conn = multiprocessing.Pipe()# 準(zhǔn)備要發(fā)送的消息messages = ["Hello", "from", "sender", "END"]# 創(chuàng)建發(fā)送者和接收者進(jìn)程p1 = multiprocessing.Process(target=sender, args=(child_conn, messages))p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))# 啟動進(jìn)程p1.start()p2.start()# 等待兩個進(jìn)程完成p1.join()p2.join()
發(fā)送者發(fā)送: Hello
發(fā)送者發(fā)送: from
發(fā)送者發(fā)送: sender
發(fā)送者發(fā)送: END
接收者接收: Hello
接收者接收: from
接收者接收: sender
進(jìn)程池的使用
multiprocessing.Pool
是一個用于管理一組工作進(jìn)程的類,它可以簡化并行任務(wù)的分配和結(jié)果收集。
示例代碼:使用 Pool
并行計算數(shù)字的平方
import multiprocessingdef square(n):"""計算一個數(shù)的平方"""return n * nif __name__ == "__main__":# 定義要處理的數(shù)字列表numbers = [1, 2, 3, 4, 5]# 創(chuàng)建一個包含4個進(jìn)程的進(jìn)程池with multiprocessing.Pool(processes=4) as pool:# 使用map方法將square函數(shù)應(yīng)用于每個數(shù)字results = pool.map(square, numbers)print("結(jié)果:", results)
結(jié)果: [1, 4, 9, 16, 25]
- 進(jìn)程池的概念和作用:
Pool
允許你指定一定數(shù)量的工作進(jìn)程,并且可以通過map
、apply
等方法輕松地將任務(wù)分配給這些進(jìn)程。這樣可以有效地利用多核CPU來加速計算密集型任務(wù)。 - 設(shè)置進(jìn)程池大小:通過
processes
參數(shù)指定進(jìn)程池中的工作進(jìn)程數(shù)量,默認(rèn)情況下,它會根據(jù)系統(tǒng)CPU核心數(shù)自動調(diào)整。 - 處理任務(wù)的方式:
pool.map()
方法類似于內(nèi)置的map()
函數(shù),但它會在多個進(jìn)程中并行執(zhí)行。在這個例子中,我們將square
函數(shù)應(yīng)用到numbers
列表中的每個元素,并返回計算結(jié)果。
Semaphore(信號量)
信號量是一種更高級的同步機(jī)制,可以用來控制同時訪問某一資源的進(jìn)程數(shù)量。
示例:使用 Semaphore
控制并發(fā)訪問
import multiprocessing
import timedef worker(semaphore, name):with semaphore:print(f"{name} 獲得信號量")time.sleep(1)if __name__ == "__main__":semaphore = multiprocessing.Semaphore(3) # 最多允許3個進(jìn)程同時訪問processes = [multiprocessing.Process(target=worker, args=(semaphore, f"進(jìn)程 {i}")) for i in range(6)]for p in processes:p.start()for p in processes:p.join()
Event(事件)
事件是一種簡單的線程間通信機(jī)制,可以讓一個或多個進(jìn)程等待某個特定事件的發(fā)生。
示例:使用 Event
實現(xiàn)進(jìn)程間的同步
import multiprocessing
import timedef wait_for_event(event):print("等待事件觸發(fā)...")event.wait() # 阻塞直到事件被設(shè)置print("事件已觸發(fā)!")def set_event(event):time.sleep(3)event.set() # 觸發(fā)事件if __name__ == "__main__":event = multiprocessing.Event()p1 = multiprocessing.Process(target=wait_for_event, args=(event,))p2 = multiprocessing.Process(target=set_event, args=(event,))p1.start()p2.start()p1.join()p2.join()
Manager(管理器)
Manager
提供了更高層次的接口,可以創(chuàng)建可以在不同進(jìn)程之間共享的數(shù)據(jù)結(jié)構(gòu),如列表、字典等。
示例:使用 Manager
創(chuàng)建共享數(shù)據(jù)結(jié)構(gòu)
import multiprocessingdef append_to_list(shared_list, item):shared_list.append(item)print(f"添加到共享列表: {item}")if __name__ == "__main__":with multiprocessing.Manager() as manager:shared_list = manager.list() # 創(chuàng)建一個可共享的列表processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)]for p in processes:p.start()for p in processes:p.join()print("最終共享列表:", list(shared_list))
文中processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)]
這一句 等于下面的代碼
processes = []
for i in range(5):p = multiprocessing.Process(target=append_to_list, args=(shared_list, i))processes.append(p)
共享內(nèi)存
multiprocessing
還支持通過共享內(nèi)存的方式在進(jìn)程之間共享數(shù)據(jù),這對于大規(guī)模數(shù)據(jù)共享特別有用。
示例:使用 Array
共享數(shù)組
import multiprocessingdef modify_array(shared_array, index, value):shared_array[index] = valueif __name__ == "__main__":array = multiprocessing.Array('i', [1, 2, 3, 4, 5]) # 創(chuàng)建共享數(shù)組processes = [multiprocessing.Process(target=modify_array, args=(array, i, i*10)) for i in range(len(array))]for p in processes:p.start()for p in processes:p.join()print("修改后的數(shù)組:", list(array))
修改后的數(shù)組: [0, 10, 20, 30, 40]