Skip to content

Fluent Python 讀書筆記(六)

此筆記適用 Python 3.4,部份已不相容後續版本的套件與 API

以 futures 撰寫並行

  • futures —— 非同步執行某項操作的物件
  • 網路 I/O 涉及高度延遲,須要用到並行來有效處理
  • 在公用 Web 測試並行 HTTP 用戶端,你可能會無意中啟動一個阻斷服務攻擊(DOS),或被懷疑在做這件事…要測試複雜的用戶端,你應該設定自己的測試伺服器(作法說明
  • requests 已經被視為 Python 式 API 的典範,它比標準函式庫 urllib.request 還要強大
  • concurrent.futures 主要功能是能讓你分別提交(submit)在不同執行緒/程序執行的可呼叫物(callables),實作於 ThreadPoolExecutorProcessPoolExecutor 的介面(分別在內部管理一個 worker thread pool 及 process pool)

多執行緒並行起手勢

使用單純迴圈來取代 ThreadPoolExecuter.map

使用手動呼叫 result 取代隱式的 next 呼叫(阻塞式):

使用 as_completed 取代阻塞式的結果查詢(完成後才產出),必須放在 Context Manager 區塊內,因為預設的 __exit__ 會阻塞:


  • concurrent.futures.Futureasyncio.Future 有相同目的,這兩個類別的實例,都代表被延遲的演算,可能被完成,也可能不會,這些被擱置的動作,可以被放入佇列、被查詢狀態、被取出結果(或例外)
  • 你不該建立 future,它們是要被並行框架實例化的,同理,你也不該更改 future 狀態
  • Future 代表某個將會發生的事物,要知道它會不會發生,就要安排它的執行時間—— Future 只在 Executor 類別中被實例化 —— Executor 接收一個可呼叫物,安排執行它,並回傳 future 實體
  • 你可以查詢 future 是否完成,透過非阻塞(nonblocking)的 .done() 方法
  • 使用者端通常不會主動查詢,而是希望 future 完成時主動通知:透過 .add_done_callback(),當 future 完成時,呼叫某個 callable
  • 當 Future 還沒完成——呼叫 .result() 會阻塞直到完成,concurrent.futures.Future 的這個方法支援 timeout 引述來指定執行時間(超時發起 TimeoutError),而 asyncio.Future 則不支援 timeout
  • 要取得 asyncio.Future 的結果的最佳作法是透過 yield from

以下解釋為什麼即使 Python 執行緒受限於 GIL,但 GIL 幾乎不會讓 I/O 密集型處理產生不良後果:

CPython 解譯器內部並不是 thread-safe,所以它有一個全域解譯器鎖(GIL),每次只允許一個執行緒執行 Python bytecode —— 這就是單一 Python 程序無法同時使用多 CPU 核心的原因(這是 CPython 解譯器的限制,不是語言本身的限制,Jython 與 IronPython 不會受此限制,但最快速的 Python 解譯器 Pypy 也有 GIL)

事實上,內建函式 / 以 C 寫成的擴充程式可以管理 GIL(例如,在執行一項耗時的工作時,解開 GIL),發起自己的 OS 執行緒,並利用所有可用的 CPU 核心 —— 這會讓程式變得非常複雜,所以大多數函式庫不會這麼做

但是,所有標準函式庫(例如 time.sleep())在執行 blocking I/O、等待 OS 的結果時,都會解開 GIL,讓其他執行緒可以執行,這代表 I/O 密集的程式可以在 Python 層級使用執行緒並獲益


  • 在 I/O 密集的工作中使用 ProcessPoolExecutor 沒有什麼好處,它適用於 CPU 密集的工作
  • CPU 密集的合理 worker 數量預設是 os.cpu_count(),超過此數量的要求沒有意義
  • 至於I/O 密集的工作要使用多少執行緒,最佳的數量取決於「做什麼事」及「可用的記憶體」,須謹慎測試來計算
  • 如果你要用 Python 來做 CPU 密集型的工作,應該嘗試一下 Pypy,它的工人速度會比 CPython 的工人快上好幾倍
  • Your mileage may vary(YMMV):在執行緒中,你永遠不會知道同一時間發生的事件之間的確切順序
  • Executor.map 有一個特殊功能,即「按照呼叫順序來回傳結果」,而不是「先產生的結果優先回傳」,要作到後者,需要透過 Executor.summit 搭配 futures.completed
  • 要顯示文字模式的進度條,可以使用 tqdm 套件,它會接收任何可迭代物,並預測所有迭代剩餘的時間(為了計算時間,它需要知道項目數量)
  • 一個實用的作法:建立一個 dict 來將 future 對應到其完成時用到的其他資料,這可以讓我們輕鬆地處理 future 的結果,無論產生結果的順序為何
  • 如果 futures.ThoreadPoolExecutor 彈性不夠,可以考慮使用更低階的 threading 函式庫,或者使用 queue 函式庫提供的 tread-safe 佇列來管理執行緒
  • 相對 futures.ProcessPoolExecutor,更低階的方案是 multiprocessing 函式庫,它提供功能讓你可以在 CPU 密集型工作中輕鬆應付 GIL,它會在多個程序間模擬 threading API,並支援軟體鎖、佇列、通道、共用記憶體等架構
  • CPU 密集型的平行處理,可參考基於 multiprocessinglelopython-parallelize
  • 對於 CPU 與資料密集的工作,新的選項是 Apache Spark
  • 「尷尬的並行」意指較高階的並行函式庫(如 concurrent.futures)通常只能處理較簡單的工作

延伸閱讀

以 asyncio 撰寫並行

  • 「並行提供一種架構/方案,來解決一個不一定只能平行處理的問題」
  • 並行(concurrency)跟平行(parallelism)有不一樣的定義,在實務上,大多數處理都是並行的,但不是平行的,為了達到真正的平行,你必須有多核心
  • asyncio 專案的代號是鬱金香(Tulip)
  • asyncio 大量使用 yield from,所以不能與舊版的 Python 相容

使用 threading 來做並行的範例:在主運算執行的同時,顯示轉動動畫

並沒有 API 可以終止 Python 的執行緒(因為執行緒可能會在任何時間點終止,這會讓系統進入無效狀態),你必須傳遞訊息給它,例如此範例的 Signal.stop

使用 asyncio 的版本:

注意此範例適用 Python3.4,現在版本已有以下更動:

  • @asyncio.coroutine 修飾器改為 async def 關鍵字
  • asyncio.async 改為 asyncio.create_task
  • 協同程序的 yield from 改為 await 關鍵字

  • 除非你想阻塞主執行緒,否則絕對不要在 asyncio 協同程序中使用 time.sleep,這會凍結事件迴圈
  • 你不一定要用 @asyncio.coroutine,但高度建議使用,它讓協同程序的地位比一般函式高,當協同程序被回收而沒有被 yield from 時(代表某些操作沒有完成,可能是 bug),可以發出錯誤訊息來協助除錯
  • asyncio.Task 大致上可以類比 threading.Thread,不同在於:取得 Task 的時候代表它已經被執行了,不用像 Thread 實體還要呼叫 .start(),另一個明顯差異是 Task 有cancel() API 可以傳送終止例外
  • 可以把協同程序視為「綠色執行緒」,在預設情況下,一切事情都會受到保護,不會像執行緒一樣有被中斷的風險
  • asyncio.Taskasyncio.Future 的子類別,取得結果的最佳作法是 yield from asyncio.Future——自動等它完成,而不阻塞事件迴圈。與concurrent.Future 相比,不再需要 .add_done_callback(),callback 要做的事,只要寫在 yield from 下一行即可,也不需要 .result() 因為 yield from 運算式的值就是結果

有些 asyncio 函式可以直接接收協同程序,在內部做好 Task 封裝,如果你想以同步方式測試協同程序,可以透過以下方式:


  • asyncio 只直接支援 TCP、UDP,對於其他更高層的協定,要使用第三方套件,例如 aiohttp 之於 HTTP
  • 使用者端程式不會直接建立事件迴圈,而是藉由呼叫 get_event_loop 來取得參考,有時我們的程式並未「擁有」事件迴圈,所以不能隨意關閉它
  • asyncio.wait 不是一個阻塞函式,它可以將多個協同程序包在同一個 Task 中,當所有協同程序都完成時,它才會完成
  • 使用 yield from 的巢狀協同程序,最內層的一定是個簡單的副產生器,只 yield 或可迭代物件,同理, yield from aiohttp.request(),最裡面的副產生器會是一個實際執行 I/O 的函式,而不是我們自己寫的東西
  • 要避免阻塞的呼叫拖累整個應用程式,有兩種方式:
    • 在不同執行緒執行每一個會阻塞的作業(但是每一個 Python 使用的 OS 執行緒會造成百萬個位元組的記憶體負擔)
    • 將會造成阻塞的作業轉換成不會阻塞的非同步呼叫
  • asyncio 並未提供非同步的檔案系統 API,但你可以使用 loop.run_in_executor 函式,在執行緒池中執行檔案系統操作
  • asyncio.Semaphore 是一個同步的設備,它用來限制並行請求的數量,其中一個用法為情境管理器 with (yield from semaphore):,當退出 with 時,semaphore 計數器會被遞減,將其他等候中的協同程序阻塞狀態解除
  • 在回呼式 API,非同步呼叫會註冊兩個回呼:一個用來處理成功操作的結果、一個用來處理錯誤。在同呼地獄(callback hell)中,工作狀況會在錯誤處理牽涉其中時迅速惡化
  • 協同程序比回呼地獄還要好很多,但它需要復出代價——要編寫 yield from、函式無法直接呼叫,必須透過事件迴圈來驅動

TCP server using asyncio streams

HTTP server using aiohttp

asyncio.start_serverloop.create_server 都是回傳 asyncio.Server 物件的協同程序,為了要啟動一個伺服器及回傳其參考,這些協同程序都必須被「驅動」


  • WebSocket 受到 asyncio 架構很好的支持,至少有兩個函式庫已經在 asyncio 上層實作它:autobahn|pythonwebsockets
  • “A real-time Web” 的趨勢是需要 Node.js 的關鍵因素,也是對 Python 生態系而言,支援 asyncio 如此重要的原因
  • 具備 asyncio 的 Python 與 Node.js 相較之下,最大的優勢是 Python 具備協同程序與 yield from 來讓非同步程式碼比原生 JS 回呼還要容易維護;最大的缺點是函式庫的支援:Node.js 豐富的函式庫生態系統,完全是圍繞著非同步呼叫來建構

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。