Python聖誕學習狂歡夜
距離開始還有3天
.
.
.
詳情↓↓↓
.
.
.
生成器和協程的介紹
生成器(Generator)的本質和特點
生成器 是 可以生成一定序列的 函式。 函式可以呼叫next()方法。
生成器的例子:
-
例子1: follow.py 可以使用生成器完成 tail -f 的功能,也就是跟蹤輸出的功能。
-
例子2: 生成器用作程式管道(類似unix pipe)
標註:unix管道
一個uinx管道是由標準流連結在一起的一系列流程.
pipeline.py
理解pipeline.py
在pipeline中,follow函式和grep函式相當於程式鏈,這樣就能鏈式處理程式。
Yield作為表達【我們開始說協程了~】
grep.py
yield最重要的問題在於yield的值是多少。
yield的值需要使用coroutine協程這個概念 相對於僅僅生成值,函式可以動態處理傳送進去的值,而最後值透過yield傳回。
協程的執行
協程的執行和生成器的執行很相似。
當你初始化一個協程,不會傳回任何東西。
協程只能響應run和send函式。協程的執行依賴run和send函式。
協程啟動
所有的協程都需要呼叫.next( )函式。
呼叫的next( )函式將要執行到第一個yield運算式的位置。
在yield運算式的位置上,很容易去執行就可以。協程使用next()啟動。
使用協程的修飾器
由【協程啟動】中我們知道,啟動一個協程需要記得呼叫next( )來開始協程,而這個啟動器容易忘記使用。
使用修飾器包一層,來讓我們啟動協程。
【以後所有的協程器都會先有@coroutine
關閉一個協程:
使用close()來關閉。
使用except捕獲協程的關閉close():
grepclose.py
使用GeneratorExit這個異常型別
丟擲一個異常:
在一個協程中,可以丟擲一個異常
異常起源於yield運算式
可以用常規方法去抓取
一些小tips
* 儘管有點相似,但是生成器和協程是*兩個完全不同的概念*。
* 生成器用來產生序列。
* 協程用來處理序列。
* 很容易產生一些誤解。因為協程有的時候用來對行程裡面的用來產生迭代物件的生成器作微調。
生成器不能夠同時生成值和接受值
* 不能往generator裡面send東西。
* 協程和迭代器的概念沒有關係
* 雖然有一種用法,確實是在一個協程裡面生成一些值,但是並不和迭代器有關係。
協程,管道,資料流
行程管道:如下圖所示,一連序列程串起來像管道一樣。
協程可以用來作為行程管道。 你僅僅需要把協程連線在一起,然後透過send()操作傳遞資料。 整個行程管道由三部分組成:
第一部分,管道源/協程源:
行程管道需要一個初始的源(一個生產者)。 這個初始的源驅動整個管道。 管道源不是協程。
第二部分,管道終止/協程終止:
管道必須有個終止點。 管道終止/協程終止是行程管道的終止點。
例子:以實現tail -f 功能為例子
分析:第一個follow函式是協程源,第二個printer函式是協程終止。協程源不是一個協程,但是需要傳入一個已經初始化完畢的協程。在協程源當中,呼叫send()。
第三部分,管道過濾器:
叫過濾器其實並不貼切,應該叫中間人Intermediate:其兩端都是send()函式。
(協程的中間層) 典型的中間層如下:
分析可知,中間層需要接受上一個coroutine,也需要往下一個coroutine裡面傳遞值。
一個管道過濾器的例子 從文章中找出具有“python”關鍵字的句子列印。 grep.py:
grep 從中間傳入follow,然後printer傳入grep。
協程和生成器的對比
不同處:生成器使用了迭代器拉取資料,協程使用send()壓入資料。
變得多分支:(上一個協程傳送資料去多個下一段協程)
圖示:
使用協程,你可以傳送資料 給 多個 協程過濾器/協程終了。但是請註意,協程源只是用來傳遞資料的,過多的在協程源中傳遞資料是令人困惑並且複雜的。
一個例子
從文章中分別打印出含有’python‘ ’ply‘ ’swig‘ 關鍵字的句子。使用了一個協程佇列向所有printer協程 送出 接收到的資料。 圖示:
或者這樣Hook them up:
圖示
為什麼我們用協程
-
協程相較於迭代器,存在更加強大的資料路由(就像上圖的資料流向)的可能。
-
協程可以將一系列簡單的資料處理元件,整合到管道,分支,合併等複雜的佈置當中。
-
但有些限制…【後文會說】相對於物件的優勢
-
從概念上簡單一點:協程就是一個函式,物件要構建整個物件。
-
從程式碼執行角度上來說,協程相對要快一些。
第三部分:協程,事件分發
事件處理
協程可以用在寫各種各樣處理事件流的元件。
介紹一個例子【這個例子會貫穿這個第三部分始終】要求做一個實時的公交車GPS位置監控。
編寫程式的主要目的是處理一份檔案。傳統上,使用SAX進行處理。
【SAX處理可以減少記憶體空間的使用,但SAX事件驅動的特性會讓它笨重和低效】
把SAX和協程組合在一起
我們可以使用協程分發SAX事件,比如:
解析:整個事件的處理如圖所示
【最終的組合】
比如,把xml改成json最後從中篩選的出固定資訊. buses.py
協程的一個有趣的事情是,您可以將初始資料源推送到低階別的語言,而不需要重寫所有處理階段。比如,PPT 中69-73頁介紹的,可以透過協程和低階別的語言進行聯動,從而達成非常好的最佳化效果。如Expat模組或者cxmlparse模組。
ps: ElementTree具有快速的遞增xml句法分析
第四部分:從資料處理到併發程式設計
複習一下上面學的特點:
協程有以下特點。
-
協程和生成器非常像。
-
我們可以用協程,去組合各種簡單的小元件。
-
我們可以使用建立行程管道,資料流圖的方法去處理資料。
-
你可以使用伴有複雜資料處理程式碼的協程。
一個相似的主題:
我們往協程內傳送資料,向執行緒內傳送資料,也向行程內傳送資料。那麼,協程自然很容易和執行緒和分散式系統聯絡起來。
基礎的併發:
我們可以透過新增一個額外的層,從而封裝協程進入執行緒或者子行程。這描繪了幾個基本的概念。
標的!協程+執行緒【沒有蛀牙】
下麵看一個執行緒的例子。 cothread.py
例子解析:第一部分:先新建一個佇列。然後定義一個永久迴圈的執行緒;這個執行緒可以將其中的元素拉出訊息佇列,然後傳送到標的裡面。第二部分:接受上面送來的元素,並透過佇列,將他們傳送進執行緒裡面。其中用到了GeneratorExit ,使得執行緒可以正確的關閉。
Hook up:cothread.py
但是:新增執行緒讓這個例子慢了50%
標的!協程+子行程
我們知道,行程之間是不共享系統資源的,所以要進行兩個子行程之間的通訊,我們需要透過一個檔案橋接兩個協程。
程式透過sendto()和recvfrom()傳遞檔案。
和環境結合的協程:
使用協程,我們可以從一個任務的執行環境中剝離出他的實現。並且,協程就是那個實現。執行環境是你選擇的執行緒,子行程,網路等。
需要註意的警告:
-
建立大量的協同程式,執行緒和行程可能是建立 不可維護 應用程式的一個好方法,並且會減慢你程式的速度。需要學習哪些是良好的使用協程的習慣。
-
在協程裡send()方法需要被適當的同步。
-
如果你對已經正在執行了的協程使用send()方法,那麼你的程式會發生崩潰。如:多個執行緒傳送資料進入同一個協程。
-
同樣的不能創造迴圈的協程:
-
堆疊傳送正在構建一種呼叫堆疊(send()函式不傳回,直到標的產生)。
-
如果呼叫一個正在傳送行程的協程,將會丟擲一個錯誤。
-
send() 函式不會掛起任何一個協程的執行。
第五部分:任務一樣的協程
Task的概念
在併發程式設計中,通常將問題細分為“任務”。
“任務”有下麵幾個經典的特點:
* 擁有獨立的控制流。
* 擁有內在的狀態。
* 可以被安排規劃/掛起/恢復。
* 可與其他的任務通訊。
協程也是任務的一種。
協程是任務的一種:
-
下麵的部分 來告訴你協程有他自己的控制流,這裡 if 的控制就是控制流。
-
協程是一個類似任何其他Python函式的陳述句序列。
-
協程有他們內在的自己的狀態,比如一些變數:其中的pattern和line就算是自己的狀態。
-
本地的生存時間和協程的生存時間相同。
-
很多協程構建了一個可執行的環境。
-
協程可以互相通訊,比如:yield就是用來接受傳遞的資訊,而上一個協程的send( )就是用來向下一個協程。
7.協程可以被掛起,重啟,關閉。
-
yield可以掛起執行行程。
-
send() 用來 重啟執行行程。
-
close()用來終止/關閉行程。
總之,一個協程滿足以上所有任務(task)的特點,所以協程非常像任務。但是協程不用與任何一個執行緒或者子行程系結。
第六部分:作業系統的中斷事件。(微嵌課程學的好的同學可以直接跳到這部分的“啟示”✌️)
作業系統的執行(複習微嵌知識)
當計算機執行時,電腦沒有同時執行好幾條指令的打算。而無論是處理器,應用程式都不懂多工處理。所以,作業系統需要去完成多工的排程。作業系統透過在多個任務中快速切換來實現多工。
需要解決的問題(還在複習微嵌知識)
CPU執行的是應用程式,而不是你的作業系統,那沒有被CPU執行的作業系統是怎麼控制正在執行的應用程式中斷的呢。
中斷(interrupts)和陷阱(Traps)
作業系統只能透過兩個機制去獲得對應用程式的控制:中斷和陷阱。
* 中斷:和硬體有關的balabala。
* 陷阱:一個軟體發出的訊號。
在兩種狀況下,CPU都會掛起正在做的,然後執行OS的程式碼(這個時候,OS的程式碼成功插入了應用程式的執行),此時,OS來切換了程式。
中斷的底層實現(略…碼字員微嵌只有70分?♀️)
中斷的高階表現:
* 中斷(Traps)使得OS的程式碼可以實現。
* 在程式執行遇到中斷(Traps)時,OS強制在CPU上停止你的程式。
* 程式掛起,然後OS執行。
表現如下圖:
每次中斷(Traps)程式都會執行另一個不同的任務。
任務排程(非常簡單):
為了執行很多工,新增一簇任務佇列。
啟示(很重要):
BB了這麼多微嵌的內容,得到的是什麼結論呢。類比任務排程,協程中yield宣告可以理解為中斷(Traps)。當一個生成器函式碰到了yield宣告,那函式將立即掛起。而執行被傳給生成器函式執行的任何程式碼。如果你把yield宣告看成了一個中斷,那麼你就可以元件一個多工執行的作業系統了。
第七部分:讓我們建一個作業系統。
標的:滿足以下條件建成一個作業系統。
1. 用純python陳述句。
2. 不用執行緒。
3. 不用子行程。
4. 使用生成器和協程器。
我們用python去構建作業系統的一些動機:
* 尤其在存在執行緒鎖(GIL)的條件下,在執行緒間切換會變得非常重要。我要高併發!
* 不阻塞和非同步I/O。我要高併發!
* 在實戰中可能會遇到:伺服器要同時處理上千條客戶端的連線。我要高併發!
* 大量的工作 致力於實現 事件驅動 或者說 響應式模型。我要元件化!
* 綜上,python構建作業系統,有利於瞭解現在高併發,元件化的趨勢。
第一步:定義任務
定義一個任務類:任務像一個協程的殼,協程函式傳入target;任務類僅僅有一個run()函式。
pyos1.py
任務類的執行:
在foo中,yield就像中斷(Traps)一樣,每次執行run(),任務就會執行到下一個yield(一個中斷)。
第二步:構建排程者
下麵是排程者類,兩個屬性分別是Task佇列和task_id與Task類對應的map。schedule()向佇列裡面新增Task。new()用來初始化標的函式(協程函式),將標的函式包裝在Task,進而裝入Scheduler。最後mainloop會從佇列裡面拉出task然後執行到task的target函式的yield為止,執行完以後再把task放回佇列。這樣下一次會從下一個yield開始執行。
pyos2.py
下麵是一個執行的例子:
執行結果,可以發現兩個task之間任務是交替的,並且以yield作為中斷點。每當執行撞到yield(中斷點)之後,Scheduler對Tasks做重新的規劃。下圖是兩個迴圈。
上述執行的結果:
第三步:確定任務的停止條件
如果,target函式裡面不是死迴圈,那麼上面的程式碼就會出錯。所以我們對Scheduler做改進。新增一個從任務佇列中刪除的操作,和對於StopIteration的驗證。
【對scheduler做改進的原因是任務的性質:可以被安排規劃/掛起/恢復。】
第四步:新增系統呼叫基類。
在OS中,中斷是應用程式請求系統服務的方式。在我們的程式碼中,OS是排程者(scheduler),而中斷是yield。為了請求排程者服務,任務需要帶值使用yield宣告。 pyos4.py
程式碼解析:
-
如果taskmap裡面存在task,就從ready佇列裡面拿任務出來,如果沒有就結束mainloop。
-
【就是傳說中的系統調運部分】ready佇列裡面的task被拿出來以後,執行task,傳回一個result物件,並初始化這個result物件。如果佇列裡面的task要停止迭代了(終止yield這個過程)就從佇列裡刪除這個任務。
-
最後再透過schedule函式把執行後的task放回佇列裡面。
-
系統呼叫基類,之後所有的系統呼叫都要從這個基類繼承。
第4.5步:新增第一個系統呼叫
這個系統呼叫想傳回任務的id。 Task的sendval屬性就像一個系統呼叫的傳回值。當task重新執行的是後,sendval將會傳入這個系統呼叫。 pyos4.py
進行最後的呼叫:
理解這段程式碼的前提:(非常重要)
-
send()函式有傳回值的,傳回值是yield運算式右邊的值。在本段程式碼中,result的傳回值是yield GetTid()的GetTid的實體或者是yield後面的None。
-
.執行send(sendval)以後,sendval被傳入了yield運算式。並賦給了mytid,傳回GetTid()給ruselt。
執行順序:先建立一個排程者(Scheduler),然後在排程者裡面新增兩個協程函式:foo(), bar(),最後觸發mainloop進行協程的排程執行。
系統呼叫原理:系統呼叫是基於系統呼叫類實現的,如GetTid類,其目的是傳出自己的tid。傳出自己的tid之後,再將task放回佇列。
第五步:任務管理
上面我們搞定了一個GetTid系統呼叫。我們現在搞定更多的系統呼叫:
* 建立一個新的任務。
* 殺掉一個已經存在的任務。
* 等待一個任務結束。
這些細小的相同的操作會與執行緒,行程配合。
1. *建立一個新的系統呼叫*:透過系統呼叫加入一個task。
2. *殺掉一個系統呼叫*:透過系統呼叫殺掉一個task。
3. 行程等待:需要大幅度改進Scheduler。
exit_waiting 是用來暫時存放要退出task的地方。
設計討論:
在任務中取用另一個任務的唯一辦法 是 使用scheduler分配給它的任務ID。 上述準則是一個安全的封裝策略。
這個準則讓任務保持獨立,不與核心混淆在一起。
這個準則能讓所有的任務都被scheduler管理的好好的。
網路伺服器的搭建:
現在已經完成了:
* 多工。
* 開啟新的行程。
* 進行新任務的管理。
這些特點都非常符合一個web伺服器的各種特點。下麵做一個Echo Server的嘗試。
但問題是這個網路伺服器是I / O阻塞的。整個python的直譯器需要掛起,一直到I/O操作結束。
非阻塞的I/O
先額外介紹一個叫Select的模組。select模組可以用來監視一組socket連結的活躍狀態。
用法如下:
下麵實現一個非阻塞I/O的網路伺服器,所用的思想就是之前所實現的Task waiting 思想。
原始碼解析:
__init__裡面的是兩個字典。用來儲存阻塞的IO的任務。waitforread()和waitforwrite()將需要等待寫入和等待讀取的task放在dict裡面。這裡的iopoll():使用select()去決定使用哪個檔案描述器,並且能夠不阻塞任意一個和I/O才做有關係的任務。poll這個東西也可以放在mainloop裡面,但是這樣會帶來線性的開銷增長。
新增新的系統呼叫:
更多請見開頭那個連線裡面的程式碼:pyos8.py
這樣我們就完成了一個多工處理的OS。這個OS可以併發執行,可以建立、銷毀、等待任務。任務可以進行I/O操作。並且最後我們實現了併發伺服器。
第八部分:協程棧的一些問題的研究。
我們可能在使用yield的時候會遇到一些問題:
先來看一段示例程式碼:
這種情況下,server()函式裡面的有呼叫Accept(),但是accept函式裡面的yield不起作用。這是因為yield只能在函式棧的最頂層掛起一個協程。你也不能夠把yield寫進庫函式裡面。
【這個限制是Stackless Python要解決的問題之一。
解決這個只能在函式棧頂掛起協程的解決方法。
* 有且只有一種方法,能夠建立可掛起的子協程和函式。
* 但是,建立可掛起的子協程和函式需要透過我們之前所說的Task Scheduler本身。
* 我們必須嚴格遵守yield宣告。
* 我們需要使用一種 -奇淫巧技- 叫做Trampolining(蹦床)。
讓我們來看看這個叫蹦床的奇淫巧技。
程式碼:trampoline.py
整個控制流如下:
我們看到,上圖中左側為統一的scheduler,如果我們想呼叫一個子執行緒,我們都用透過上面的scheduler進行排程。
控制流:
控制過程:scheduler -> subroutine_1 -> scheduler -> subroutine_2 -> scheduler -> subroutine_1
就像蹦床(trampolining)一樣,所有的子行程排程都要先傳回scheduler,再進行下一步。【有點像汽車換擋。
而不是:-scheduler -> subroutine_1 -> subroutine_2 -> subroutine_1-這種直接棧式的子協程排程是不被允許的。
第九部分:最後的一些話。
更加深遠的一些話題。
有很多更加深遠的話題值得我們去討論。其實在上面的套路裡面都說了一些。
* 在task之間的通訊。
* 處理阻塞的一些操作:比如和資料庫的一些連結。
* 多行程的協程和多執行緒的協程。
* 異常處理。
讓我們對yield一點小尊重:
Python 的生成器比很多人想象的有用的多。生成器可以:
* 定製可迭代物件。 * 處理程式管道和資料流。【第二部分提到】 * 事物處理。【第三部分提到的和SAX結合的事務處理】 * 合作的多工處理【第四部分提到的Task,子行程子執行緒合作】
在下列三種蛀牙的情況下我們可以想起來,使用yield。
* 迭代器:要產生資料。 * 接受資料/訊息:消費資料。 * 一個中斷:在合作性的多工裡面。
千萬不要一個函式裡麵包含兩個或多個以上的功能,比如函式是generator就是generator,是一個coroutine就是一個coroutin。
最後
感謝大家閱讀作者:xrtdkr
源自:https://juejin.im/post/5a37d5f66fb9a045146425e3
宣告: 文章著作權歸作者所有,如有侵權,請聯絡小編刪除