作者 | Peterbe
譯者 | leemeans ? ? 共計翻譯:5 篇 貢獻時間:22 天
假設現在的背景關係(LCTT 譯註:context,計算機術語,此處意為業務情景)是這樣的:一個 zip 檔案被上傳到一個Web 服務[1]中,然後 Python 需要解壓這個 zip 檔案然後分析和處理其中的每個檔案。這個特殊的應用檢視每個檔案各自的名稱和大小,並和已經上傳到 AWS S3 上的檔案進行比較,如果檔案(和 AWS S3 上的相比)有所不同或者檔案本身更新,那麼就將它上傳到 AWS S3。
Uploads today
挑戰在於這些 zip 檔案太大了。它們的平均大小是 560MB 但是其中一些大於 1GB。這些檔案中大多數是文字檔案,但是其中同樣也有一些巨大的二進位制檔案。不同尋常的是,每個 zip 檔案包含 100 個檔案但是其中 1-3 個檔案卻佔據了多達 95% 的 zip 檔案大小。
最開始我嘗試在記憶體中解壓檔案,並且每次只處理一個檔案。在各種記憶體爆炸和 EC2 耗盡記憶體的情況下,這個方法壯烈失敗了。我覺得這個原因是這樣的。最開始你有 1GB 檔案在記憶體中,然後你現在解壓每個檔案,在記憶體中大約就要佔用 2-3GB。所以,在很多次測試之後,解決方案是將這些 zip 檔案複製到磁碟上(在臨時目錄 /tmp
中),然後遍歷這些檔案。這次情況好多了但是我仍然註意到了整個解壓過程花費了巨量的時間。是否可能有方法最佳化呢?
原始函式
首先是下麵這些模擬對 zip 檔案中檔案實際操作的普通函式:
def _count_file(fn):
with open(fn, 'rb') as f:
return _count_file_object(f)
def _count_file_object(f):
# Note that this iterates on 'f'.
# You *could* do 'return len(f.read())'
# which would be faster but potentially memory
# inefficient and unrealistic in terms of this
# benchmark experiment.
total = 0
for line in f:
total += len(line)
return total
這裡是可能最簡單的另一個函式:
def f1(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extractall(dest)
total = 0
for root, dirs, files in os.walk(dest):
for file_ in files:
fn = os.path.join(root, file_)
total += _count_file(fn)
return total
如果我更仔細地分析一下,我將會發現這個函式花費時間 40% 執行 extractall
,60% 的時間在遍歷各個檔案並讀取其長度。
第一步嘗試
我的第一步嘗試是使用執行緒。先建立一個 zipfile.ZipFile
的實體,展開其中的每個檔案名,然後為每一個檔案開始一個執行緒。每個執行緒都給它一個函式來做“實質工作”(在這個基準測試中,就是遍歷每個檔案然後獲取它的名稱)。實際業務中的函式進行的工作是複雜的 S3、Redis 和 PostgreSQL 操作,但是在我的基準測試中我只需要製作一個可以找出檔案長度的函式就好了。執行緒池函式:
def f2(fn, dest):
def unzip_member(zf, member, dest):
zf.extract(member, dest)
fn = os.path.join(dest, member.filename)
return _count_file(fn)
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member,
zf,
member,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結果:加速 ~10%
第二步嘗試
所以可能是 GIL(LCTT 譯註:Global Interpreter Lock,一種全域性鎖,CPython 中的一個概念)阻礙了我。最自然的想法是嘗試使用多執行緒在多個 CPU 上分配工作。但是這樣做有缺點,那就是你不能傳遞一個非可 pickle 序列化的物件(LCTT 譯註:意為只有可 pickle 序列化的物件可以被傳遞),所以你只能傳送檔案名到之後的函式中:
def unzip_member_f3(zip_filepath, filename, dest):
with open(zip_filepath, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extract(filename, dest)
fn = os.path.join(dest, filename)
return _count_file(fn)
def f3(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member_f3,
fn,
member.filename,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結果: 加速 ~300%
這是作弊
使用處理器池的問題是這樣需要儲存在磁碟上的原始 .zip
檔案。所以為了在我的 web 伺服器上使用這個解決方案,我首先得要將記憶體中的 zip 檔案儲存到磁碟,然後呼叫這個函式。這樣做的代價我不是很清楚但是應該不低。
好吧,再翻翻看又沒有損失。可能,解壓過程加速到足以彌補這樣做的損失了吧。
但是一定記住!這個最佳化取決於使用所有可用的 CPU。如果一些其它的 CPU 需要執行在 gunicorn
中的其它事務呢?這時,這些其它行程必須等待,直到有 CPU 可用。由於在這個伺服器上有其他的事務正在進行,我不是很確定我想要在行程中接管所有其他 CPU。
結論
一步一步地做這個任務的這個過程感覺挺好的。你被限制在一個 CPU 上但是表現仍然特別好。同樣地,一定要看看在f1
和 f2
兩段程式碼之間的不同之處!利用 concurrent.futures
池類你可以獲取到允許使用的 CPU 的個數,但是這樣做同樣給人感覺不是很好。如果你在虛擬環境中獲取的個數是錯的呢?或者可用的個數太低以致無法從負載分配獲取好處並且現在你僅僅是為了移動負載而支付營運開支呢?
我將會繼續使用 zipfile.ZipFile(file_buffer).extractall(temp_dir)
。這個工作這樣做已經足夠好了。
想試試手嗎?
我使用一個 c5.4xlarge
EC2 伺服器來進行我的基準測試。檔案可以從此處下載:
wget https://www.peterbe.com/unzip-in-parallel/hack.unzip-in-parallel.py
wget https://www.peterbe.com/unzip-in-parallel/symbols-2017-11-27T14_15_30.zip
這裡的 .zip
檔案有 34MB。和在伺服器上的相比已經小了很多。
hack.unzip-in-parallel.py
檔案裡是一團糟。它包含了大量可怕的修正和醜陋的程式碼,但是這隻是一個開始。
via: https://www.peterbe.com/plog/fastest-way-to-unzip-a-zip-file-in-python
作者:Peterbe[3] 譯者:Leemeans 校對:wxy
本文由 LCTT 原創編譯,Linux中國 榮譽推出