註
此筆記適用 Python 3.4,部份已不相容後續版本的套件與 API
以 futures 撰寫並行
- futures —— 非同步執行某項操作的物件
- 網路 I/O 涉及高度延遲,須要用到並行來有效處理
- 在公用 Web 測試並行 HTTP 用戶端,你可能會無意中啟動一個阻斷服務攻擊(DOS),或被懷疑在做這件事…要測試複雜的用戶端,你應該設定自己的測試伺服器(作法說明)
requests
已經被視為 Python 式 API 的典範,它比標準函式庫urllib.request
還要強大concurrent.futures
主要功能是能讓你分別提交(submit)在不同執行緒/程序執行的可呼叫物(callables),實作於ThreadPoolExecutor
、ProcessPoolExecutor
的介面(分別在內部管理一個 worker thread pool 及 process pool)
多執行緒並行起手勢
1 2 3 4 5 6 7 8 9 10 11 12 |
# 以 worker 數量來初始化 with futures.ThreadPoolExecuter(MAX_WORKERS) as executor: # func 函式同時被多個執行緒呼叫,回傳一個產生器 res = executor.map(func, tasks) # executor.__exit__ 方法呼叫 executor.shutdown(wait=True) # 這個動作會被**阻塞**直到所有執行緒都完成 # 迭代取出每一個呼叫的回傳值 # 隱式的 next() 會呼叫每一個 future.result() # 如果有任何執行緒在呼叫時出錯,會在這邊發出例外 result = list(res) |
使用單純迴圈來取代 ThreadPoolExecuter.map
:
1 2 3 4 5 |
with futures.ThreadPoolExecuter(MAX_WORKERS) as a executor: future_stack = [] for task in tasks: future_stack.append(executor.submit(func, task)) |
使用手動呼叫 result 取代隱式的 next 呼叫(阻塞式):
1 2 3 4 |
result = [] for future in future_stack: result.append(future.result()) |
使用 as_completed
取代阻塞式的結果查詢(完成後才產出),必須放在 Context Manager 區塊內,因為預設的 __exit__
會阻塞:
1 2 3 4 5 6 |
with futures.ThreadPoolExecuter(MAX_WORKERS) as a executor: ... result = [] for future in futures.as_completed(future_stack): result.append(future.result()) |
concurrent.futures.Future
、asyncio.Future
有相同目的,這兩個類別的實例,都代表被延遲的演算,可能被完成,也可能不會,這些被擱置的動作,可以被放入佇列、被查詢狀態、被取出結果(或例外)- 你不該建立 future,它們是要被並行框架實例化的,同理,你也不該更改 future 狀態
- Future 代表某個將會發生的事物,要知道它會不會發生,就要安排它的執行時間—— Future 只在 Executor 類別中被實例化 —— Executor 接收一個可呼叫物,安排執行它,並回傳 future 實體
- 你可以查詢 future 是否完成,透過非阻塞(nonblocking)的
.done()
方法 - 使用者端通常不會主動查詢,而是希望 future 完成時主動通知:透過
.add_done_callback()
,當 future 完成時,呼叫某個 callable - 當 Future 還沒完成——呼叫
.result()
會阻塞直到完成,concurrent.futures.Future
的這個方法支援 timeout 引述來指定執行時間(超時發起 TimeoutError),而asyncio.Future
則不支援 timeout - 要取得
asyncio.Future
的結果的最佳作法是透過yield from
以下解釋為什麼即使 Python 執行緒受限於 GIL,但 GIL 幾乎不會讓 I/O 密集型處理產生不良後果:
CPython 解譯器內部並不是 thread-safe,所以它有一個全域解譯器鎖(GIL),每次只允許一個執行緒執行 Python bytecode —— 這就是單一 Python 程序無法同時使用多 CPU 核心的原因(這是 CPython 解譯器的限制,不是語言本身的限制,Jython 與 IronPython 不會受此限制,但最快速的 Python 解譯器 Pypy 也有 GIL)
事實上,內建函式 / 以 C 寫成的擴充程式可以管理 GIL(例如,在執行一項耗時的工作時,解開 GIL),發起自己的 OS 執行緒,並利用所有可用的 CPU 核心 —— 這會讓程式變得非常複雜,所以大多數函式庫不會這麼做
但是,所有標準函式庫(例如
time.sleep()
)在執行 blocking I/O、等待 OS 的結果時,都會解開 GIL,讓其他執行緒可以執行,這代表 I/O 密集的程式可以在 Python 層級使用執行緒並獲益
- 在 I/O 密集的工作中使用
ProcessPoolExecutor
沒有什麼好處,它適用於 CPU 密集的工作 - CPU 密集的合理 worker 數量預設是
os.cpu_count()
,超過此數量的要求沒有意義 - 至於I/O 密集的工作要使用多少執行緒,最佳的數量取決於「做什麼事」及「可用的記憶體」,須謹慎測試來計算
- 如果你要用 Python 來做 CPU 密集型的工作,應該嘗試一下 Pypy,它的工人速度會比 CPython 的工人快上好幾倍
- Your mileage may vary(YMMV):在執行緒中,你永遠不會知道同一時間發生的事件之間的確切順序
Executor.map
有一個特殊功能,即「按照呼叫順序來回傳結果」,而不是「先產生的結果優先回傳」,要作到後者,需要透過Executor.summit
搭配futures.completed
- 要顯示文字模式的進度條,可以使用
tqdm
套件,它會接收任何可迭代物,並預測所有迭代剩餘的時間(為了計算時間,它需要知道項目數量) - 一個實用的作法:建立一個 dict 來將 future 對應到其完成時用到的其他資料,這可以讓我們輕鬆地處理 future 的結果,無論產生結果的順序為何
- 如果
futures.ThoreadPoolExecutor
彈性不夠,可以考慮使用更低階的threading
函式庫,或者使用queue
函式庫提供的 tread-safe 佇列來管理執行緒 - 相對
futures.ProcessPoolExecutor
,更低階的方案是multiprocessing
函式庫,它提供功能讓你可以在 CPU 密集型工作中輕鬆應付 GIL,它會在多個程序間模擬threading
API,並支援軟體鎖、佇列、通道、共用記憶體等架構 - CPU 密集型的平行處理,可參考基於
multiprocessing
的lelo
及python-parallelize
- 對於 CPU 與資料密集的工作,新的選項是 Apache Spark
- 「尷尬的並行」意指較高階的並行函式庫(如
concurrent.futures
)通常只能處理較簡單的工作
延伸閱讀
- Can’t we get rid of the Global Interpreter Lock?
- It isn’t Easy to Remove the GIL
- Python Threads and the GIL
以 asyncio 撰寫並行
- 「並行提供一種架構/方案,來解決一個不一定只能平行處理的問題」
- 並行(concurrency)跟平行(parallelism)有不一樣的定義,在實務上,大多數處理都是並行的,但不是平行的,為了達到真正的平行,你必須有多核心
asyncio
專案的代號是鬱金香(Tulip)asyncio
大量使用yield from
,所以不能與舊版的 Python 相容
使用 threading 來做並行的範例:在主運算執行的同時,顯示轉動動畫
並沒有 API 可以終止 Python 的執行緒(因為執行緒可能會在任何時間點終止,這會讓系統進入無效狀態),你必須傳遞訊息給它,例如此範例的 Signal.stop
使用 asyncio 的版本:
注意此範例適用 Python3.4,現在版本已有以下更動:
@asyncio.coroutine
修飾器改為async def
關鍵字asyncio.async
改為asyncio.create_task
- 協同程序的
yield from
改為await
關鍵字
- 除非你想阻塞主執行緒,否則絕對不要在 asyncio 協同程序中使用
time.sleep
,這會凍結事件迴圈 - 你不一定要用
@asyncio.coroutine
,但高度建議使用,它讓協同程序的地位比一般函式高,當協同程序被回收而沒有被yield from
時(代表某些操作沒有完成,可能是 bug),可以發出錯誤訊息來協助除錯 asyncio.Task
大致上可以類比threading.Thread
,不同在於:取得 Task 的時候代表它已經被執行了,不用像 Thread 實體還要呼叫.start()
,另一個明顯差異是 Task 有cancel()
API 可以傳送終止例外- 可以把協同程序視為「綠色執行緒」,在預設情況下,一切事情都會受到保護,不會像執行緒一樣有被中斷的風險
asyncio.Task
是asyncio.Future
的子類別,取得結果的最佳作法是yield from asyncio.Future
——自動等它完成,而不阻塞事件迴圈。與concurrent.Future
相比,不再需要.add_done_callback()
,callback 要做的事,只要寫在yield from
下一行即可,也不需要.result()
因為yield from
運算式的值就是結果
有些 asyncio 函式可以直接接收協同程序,在內部做好 Task 封裝,如果你想以同步方式測試協同程序,可以透過以下方式:
1 2 3 4 |
def run_sync(coro_or_future): loop = asyncio.get_event_loop() return loop.run_until_compelete(coro_or_future) |
- asyncio 只直接支援 TCP、UDP,對於其他更高層的協定,要使用第三方套件,例如
aiohttp
之於 HTTP - 使用者端程式不會直接建立事件迴圈,而是藉由呼叫
get_event_loop
來取得參考,有時我們的程式並未「擁有」事件迴圈,所以不能隨意關閉它 asyncio.wait
不是一個阻塞函式,它可以將多個協同程序包在同一個 Task 中,當所有協同程序都完成時,它才會完成- 使用
yield from
的巢狀協同程序,最內層的一定是個簡單的副產生器,只yield
或可迭代物件,同理,yield from aiohttp.request()
,最裡面的副產生器會是一個實際執行 I/O 的函式,而不是我們自己寫的東西 - 要避免阻塞的呼叫拖累整個應用程式,有兩種方式:
- 在不同執行緒執行每一個會阻塞的作業(但是每一個 Python 使用的 OS 執行緒會造成百萬個位元組的記憶體負擔)
- 將會造成阻塞的作業轉換成不會阻塞的非同步呼叫
asyncio
並未提供非同步的檔案系統 API,但你可以使用loop.run_in_executor
函式,在執行緒池中執行檔案系統操作asyncio.Semaphore
是一個同步的設備,它用來限制並行請求的數量,其中一個用法為情境管理器with (yield from semaphore):
,當退出 with 時,semaphore 計數器會被遞減,將其他等候中的協同程序阻塞狀態解除- 在回呼式 API,非同步呼叫會註冊兩個回呼:一個用來處理成功操作的結果、一個用來處理錯誤。在同呼地獄(callback hell)中,工作狀況會在錯誤處理牽涉其中時迅速惡化
- 協同程序比回呼地獄還要好很多,但它需要復出代價——要編寫 yield from、函式無法直接呼叫,必須透過事件迴圈來驅動
TCP server using asyncio streams
HTTP server using aiohttp
asyncio.start_server
、loop.create_server
都是回傳 asyncio.Server
物件的協同程序,為了要啟動一個伺服器及回傳其參考,這些協同程序都必須被「驅動」
- WebSocket 受到 asyncio 架構很好的支持,至少有兩個函式庫已經在 asyncio 上層實作它:
autobahn|python
、websockets
- “A real-time Web” 的趨勢是需要 Node.js 的關鍵因素,也是對 Python 生態系而言,支援 asyncio 如此重要的原因
- 具備 asyncio 的 Python 與 Node.js 相較之下,最大的優勢是 Python 具備協同程序與
yield from
來讓非同步程式碼比原生 JS 回呼還要容易維護;最大的缺點是函式庫的支援:Node.js 豐富的函式庫生態系統,完全是圍繞著非同步呼叫來建構