作者:範傳輝
如需轉載請聯絡大資料(ID:hzdashuju)
分散式行程在Python中依然要用到multiprocessing模組。multiprocessing模組不但支援多行程,其中managers子模組還支援把多行程分佈到多臺機器上。可以寫一個服務行程作為排程者,將任務分佈到其他多個行程中,依靠網路通訊進行管理。
舉個例子:在做爬蟲程式時,常常會遇到這樣的場景,我們想抓取某個網站的所有圖片,如果使用多行程的話,一般是一個行程負責抓取圖片的連結地址,將連結地址存放到Queue中,另外的行程負責從Queue中讀取連結地址進行下載和儲存到本地。
現在把這個過程做成分散式,一臺機器上的行程負責抓取連結,其他機器上的行程負責下載儲存。那麼遇到的主要問題是將Queue暴露到網路中,讓其他機器行程都可以訪問,分散式行程就是將這一個過程進行了封裝,我們可以將這個過程稱為本地佇列的網路化。整體過程如圖1-24所示。
▲圖1-24 分散式行程
要實現上面例子的功能,建立分散式行程需要分為六個步驟:
-
建立佇列Queue,用來進行行程間的通訊。服務行程建立任務佇列task_queue,用來作為傳遞任務給任務行程的通道;服務行程建立結果佇列result_queue,作為任務行程完成任務後回覆服務行程的通道。在分散式多行程環境下,必須透過由Queuemanager獲得的Queue介面來新增任務。
-
把第一步中建立的佇列在網路上註冊,暴露給其他行程(主機),註冊後獲得網路佇列,相當於本地佇列的映像。
-
建立一個物件(Queuemanager(BaseManager))實體manager,系結埠和驗證口令。
-
啟動第三步中建立的實體,即啟動管理manager,監管資訊通道。
-
透過管理實體的方法獲得透過網路訪問的Queue物件,即再把網路佇列物體化成可以使用的本地佇列。
-
建立任務到“本地”佇列中,自動上傳任務到網路佇列中,分配給任務行程進行處理。
接下來透過程式實現上面的例子(Linux版),首先編寫的是服務行程(taskManager.py),程式碼如下:
import random,time,Queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue=Queue.Queue()
result_queue=Queue.Queue()
class Queuemanager(BaseManager):
pass
# 第二步:把建立的兩個佇列註冊在網路上,利用register方法,callable引數關聯了Queue物件,
# 將Queue物件在網路中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
# 第三步:系結埠8001,設定驗證口令‘qiye’。這個相當於物件的初始化
manager=Queuemanager(address=('',8001),authkey='qiye')
# 第四步:啟動管理,監聽資訊通道
manager.start()
# 第五步:透過管理實體的方法獲得透過網路訪問的Queue物件
task=manager.get_task_queue()
result=manager.get_result_queue()
# 第六步:新增任務
for url in ["ImageUrl_"+i for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
# 獲取傳回結果
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
# 關閉管理
manager.shutdown()
任務行程已經編寫完成,接下來編寫任務行程(taskWorker.py),建立任務行程的步驟相對較少,需要四個步驟:
-
使用QueueManager註冊用於獲取Queue的方法名稱,任務行程只能透過名稱來在網路上獲取Queue。
-
連線伺服器,埠和驗證口令註意保持與服務行程中完全一致。
-
從網路上獲取Queue,進行本地化。
-
從task佇列獲取任務,並把結果寫入result佇列。
程式taskWorker.py程式碼(win/linux版)如下:
# coding:utf-8
import time
from multiprocessing.managers import BaseManager
# 建立類似的QueueManager:
class QueueManager(BaseManager):
pass
# 第一步:使用QueueManager註冊用於獲取Queue的方法名稱
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:連線到伺服器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證口令註意保持與服務行程完全一致:
m = QueueManager(address=(server_addr, 8001), authkey='qiye')
# 從網路連線:
m.connect()
# 第三步:獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:從task佇列獲取任務,並把結果寫入result佇列:
while(not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...' % image_url)
time.sleep(1)
result.put('%s--->success'%image_url)
# 處理結束:
print('worker exit.')
最後開始執行程式,先啟動服務行程taskManager.py,執行結果如下:
put task ImageUrl_0 ...
put task ImageUrl_1 ...
put task ImageUrl_2 ...
put task ImageUrl_3 ...
put task ImageUrl_4 ...
put task ImageUrl_5 ...
put task ImageUrl_6 ...
put task ImageUrl_7 ...
put task ImageUrl_8 ...
put task ImageUrl_9 ...
try get result...
接著再啟動任務行程taskWorker.py,執行結果如下:
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
worker exit.
當任務行程執行結束後,服務行程執行結果如下:
result is ImageUrl_0--->success
result is ImageUrl_1--->success
result is ImageUrl_2--->success
result is ImageUrl_3--->success
result is ImageUrl_4--->success
result is ImageUrl_5--->success
result is ImageUrl_6--->success
result is ImageUrl_7--->success
result is ImageUrl_8--->success
result is ImageUrl_9--->success
其實這就是一個簡單但真正的分散式計算,把程式碼稍加改造,啟動多個worker,就可以把任務分佈到幾臺甚至幾十臺機器上,實現大規模的分散式爬蟲。
註:由於平臺的特性,建立服務行程的程式碼在Linux和Windows上有一些不同,建立工作行程的程式碼是一致的。
taskManager.py程式在Windows版下的程式碼如下:
# coding:utf-8
# taskManager.py for windows
import Queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
# 任務個數
task_number = 10
# 定義收發佇列
task_queue = Queue.Queue(task_number);
result_queue = Queue.Queue(task_number);
def get_task():
return task_queue
def get_result():
return result_queue
# 建立類似的QueueManager:
class QueueManager(BaseManager):
pass
def win_run():
# Windows下系結呼叫介面不能使用lambda,所以只能先定義函式再系結
QueueManager.register('get_task_queue',callable = get_task)
QueueManager.register('get_result_queue',callable = get_result)
# 系結埠並設定驗證口令,Windows下需要填寫IP地址,Linux下不填預設為本地
manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye')
# 啟動
manager.start()
try:
# 透過網路獲取任務佇列和結果佇列
task = manager.get_task_queue()
result = manager.get_result_queue()
# 新增任務
for url in ["ImageUrl_"+str(i) for i in range(10)]:
print 'put task %s ...' %url
task.put(url)
print 'try get result...'
for i in range(10):
print 'result is %s' %result.get(timeout=10)
except:
print('Manager error')
finally:
# 一定要關閉,否則會報管道未關閉的錯誤
manager.shutdown()
if __name__ == '__main__':
# Windows下多行程可能會有問題,新增這句可以緩解
freeze_support()
win_run()
關於作者:範傳輝,資深網蟲,Python開發者,參與開發了多項網路應用,在實際開發中積累了豐富的實戰經驗,並善於總結,貢獻了多篇技術文章廣受好評。研究興趣是網路安全、爬蟲技術、資料分析、驅動開發等技術。
本文摘編自《Python爬蟲開發與專案實戰》,經出版方授權釋出。
延伸閱讀《Python爬蟲開發與專案實戰》
點選上圖瞭解及購買
轉載請聯絡微信:DoctorData
推薦語:零基礎學習爬蟲技術,從Python和Web前端基礎開始講起,由淺入深,包含大量案例,實用性強。
朋友會在“發現-看一看”看到你“在看”的內容