歡迎光臨
每天分享高質量文章

RCU synchronize原理分析

RCU(Read-Copy Update)是Linux核心比較成熟的新型讀寫鎖,具有較高的讀寫併發效能,常常用在需要互斥的效能關鍵路徑。在kernel中,rcu有tiny rcu和tree rcu兩種實現,tiny rcu更加簡潔,通常用在小型嵌入式系統中,tree rcu則被廣泛使用在了server, desktop以及android系統中。本文將以tree rcu為分析物件。

如何度過寬限期

RCU的核心理念是讀者訪問的同時,寫者可以更新訪問物件的副本,但寫者需要等待所有讀者完成訪問之後,才能刪除老物件。這個過程實現的關鍵和難點就在於如何判斷所有的讀者已經完成訪問。通常把寫者開始更新,到所有讀者完成訪問這段時間叫做寬限期(Grace Period)。核心中實現寬限期等待的函式是synchronize_rcu。

1.1 讀者鎖的標記

在普通的TREE RCU實現中,rcu_read_lock和rcu_read_unlock的實現非常簡單,分別是關閉搶佔和開啟搶佔:

  1. staticinlinevoid __rcu_read_lock(void)
  2. {
  3. preempt_disable();
  4. }
  5.  
  6. staticinlinevoid __rcu_read_unlock(void)
  7. {
  8. preempt_enable();
  9. }

這時是否度過寬限期的判斷就比較簡單:每個CPU都經過一次搶佔。因為發生搶佔,就說明不在rcu_read_lock和rcu_read_unlock之間,必然已經完成訪問或者還未開始訪問。

1.2 每個CPU度過quiescnet state

接下來我們看每個CPU上報完成搶佔的過程。kernel把這個完成搶佔的狀態稱為quiescent state。每個CPU在時鐘中斷的處理函式中,都會判斷當前CPU是否度過quiescent state。

  1. void update_process_times(int user_tick)
  2. {
  3. ……
  4. rcu_check_callbacks(cpu, user_tick);
  5. ……
  6. }
  7.  
  8. void rcu_check_callbacks(int cpu,int user)
  9. {
  10. ……
  11. if(user || rcu_is_cpu_rrupt_from_idle()){
  12. /*在使用者態背景關係,或者idle背景關係,說明已經發生過搶佔*/
  13. rcu_sched_qs(cpu);
  14. rcu_bh_qs(cpu);
  15. }elseif(!in_softirq()){
  16. /*僅僅針對使用rcu_read_lock_bh型別的rcu,不在softirq,
  17. *說明已經不在read_lock關鍵區域*/
  18. rcu_bh_qs(cpu);
  19. }
  20. rcu_preempt_check_callbacks(cpu);
  21. if(rcu_pending(cpu))
  22. invoke_rcu_core();
  23. ……
  24. }

這裡補充一個細節說明,Tree RCU有多個型別的RCU State,用於不同的RCU場景,包括rcu_sched_state、rcu_bh_state和rcu_preempt_state。不同的場景使用不同的RCU API,度過寬限期的方式就有所區別。例如上面程式碼中的rcu_sched_qs和rcu_bh_qs,就是為了標記不同的state度過quiescent state。普通的RCU例如核心執行緒、系統呼叫等場景,使用rcu_read_lock或者rcu_read_lock_sched,他們的實現是一樣的;軟中斷背景關係則可以使用rcu_read_lock_bh,使得寬限期更快度過。

細分這些場景是為了提高RCU的效率。rcu_preempt_state將在下文進行說明。

1.3 彙報寬限期度過

每個CPU度過quiescent state之後,需要向上彙報直至所有CPU完成quiescent state,從而標識寬限期的完成,這個彙報過程在軟中斷RCU_SOFTIRQ中完成。軟中斷的喚醒則是在上述的時鐘中斷中進行。

update_process_times

    -> rcu_check_callbacks

        -> invoke_rcu_core

RCU_SOFTIRQ軟中斷處理的彙報流程如下:

rcu_process_callbacks

    -> __rcu_process_callbacks

        -> rcu_check_quiescent_state

            -> rcu_report_qs_rdp

                -> rcu_report_qs_rnp

其中rcu_report_qs_rnp是從葉子節點向根節點的遍歷過程,同一個節點的子節點都透過quiescent state後,該節點也設定為透過。

這個樹狀的彙報過程,也就是“Tree RCU”這個名字得來的緣由。

樹結構每層的節點數量和葉子節點數量由一系列的宏定義來決定:

  1. #define MAX_RCU_LVLS 4
  2. #define RCU_FANOUT_1 (CONFIG_RCU_FANOUT_LEAF)
  3. #define RCU_FANOUT_2 (RCU_FANOUT_1 * CONFIG_RCU_FANOUT)
  4. #define RCU_FANOUT_3 (RCU_FANOUT_2 * CONFIG_RCU_FANOUT)
  5. #define RCU_FANOUT_4 (RCU_FANOUT_3 * CONFIG_RCU_FANOUT)
  6.  
  7. #if NR_CPUS <= RCU_FANOUT_1
  8. # define RCU_NUM_LVLS 1
  9. # define NUM_RCU_LVL_0 1
  10. # define NUM_RCU_LVL_1 (NR_CPUS)
  11. # define NUM_RCU_LVL_2 0
  12. # define NUM_RCU_LVL_3 0
  13. # define NUM_RCU_LVL_4 0
  14. #elif NR_CPUS <= RCU_FANOUT_2
  15. # define RCU_NUM_LVLS 2
  16. # define NUM_RCU_LVL_0 1
  17. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  18. # define NUM_RCU_LVL_2 (NR_CPUS)
  19. # define NUM_RCU_LVL_3 0
  20. # define NUM_RCU_LVL_4 0
  21. #elif NR_CPUS <= RCU_FANOUT_3
  22. # define RCU_NUM_LVLS 3
  23. # define NUM_RCU_LVL_0 1
  24. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
  25. # define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  26. # define NUM_RCU_LVL_3 (NR_CPUS)
  27. # define NUM_RCU_LVL_4 0
  28. #elif NR_CPUS <= RCU_FANOUT_4
  29. # define RCU_NUM_LVLS 4
  30. # define NUM_RCU_LVL_0 1
  31. # define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_3)
  32. # define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
  33. # define NUM_RCU_LVL_3 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
  34. # define NUM_RCU_LVL_4 (NR_CPUS)

1.4 寬限期的發起與完成

所有寬限期的發起和完成都是由同一個核心執行緒rcu_gp_kthread來完成。透過判斷rsp->gp_flags & RCU_GP_FLAG_INIT來決定是否發起一個gp;透過判斷! (rnp->qsmask) && !rcu_preempt_blocked_readers_cgp(rnp))來決定是否結束一個gp

發起一個GP時,rsp->gpnum++;結束一個GP時,rsp->completed = rsp->gpnum。

1.5 rcu callbacks處理

rcu的callback通常是在sychronize_rcu中新增的wakeme_after_rcu,也就是喚醒synchronize_rcu的行程,它正在等待GP的結束。

         callbacks的處理同樣在軟中斷RCU_SOFTIRQ中完成

rcu_process_callbacks

    -> __rcu_process_callbacks

        -> invoke_rcu_callbacks

            -> rcu_do_batch

                -> __rcu_reclaim

這裡RCU的callbacks連結串列採用了一種分段連結串列的方式,整個callback連結串列,根據具體GP結束的時間,分成若干段:nxtlist — *nxttail[RCU_DONE_TAIL] — *nxttail[RCU_WAIT_TAIL] — *nxttail[RCU_NEXT_READY_TAIL] — *nxttail[RCU_NEXT_TAIL]。

rcu_do_batch只處理nxtlist — *nxttail[RCU_DONE_TAIL]之間的callbacks。每個GP結束都會重新調整callback所處的段位,每個新的callback將會新增在末尾,也就是*nxttail[RCU_NEXT_TAIL]。

可搶佔的RCU

如果config檔案定義了CONFIG_TREE_PREEMPT_RCU=y,那麼sychronize_rcu將預設使用rcu_preempt_state。這類rcu的特點就在於read_lock期間是允許其它行程搶佔的,因此它判斷寬限期度過的方法就不太一樣。

從rcu_read_lock和rcu_read_unlock的定義就可以知道,TREE_PREEMPT_RCU並不是以簡單的經過搶佔為CPU渡過GP的標準,而是有個rcu_read_lock_nesting計數

  1. void __rcu_read_lock(void)
  2. {
  3. current->rcu_read_lock_nesting++;
  4. barrier();/* critical section after entry code. */
  5. }
  6.  
  7. void __rcu_read_unlock(void)
  8. {
  9. struct task_struct *t = current;
  10.  
  11. if(t->rcu_read_lock_nesting !=1){
  12. t->rcu_read_lock_nesting;
  13. }else{
  14. barrier();/* critical section before exit code. */
  15. t->rcu_read_lock_nesting = INT_MIN;
  16. barrier();/* assign before ->rcu_read_unlock_special load */
  17. if(unlikely(ACCESS_ONCE(t->rcu_read_unlock_special)))
  18. rcu_read_unlock_special(t);
  19. barrier();/* ->rcu_read_unlock_special load before assign */
  20. t->rcu_read_lock_nesting =0;
  21. }
  22. }

當搶佔發生時,__schedule函式會呼叫rcu_note_context_switch來通知RCU更新狀態,如果當前CPU處於rcu_read_lock狀態,當前行程將會放入rnp->blkd_tasks阻塞佇列,並呈現在rnp->gp_tasks連結串列中。

從上文1.3節寬限期的結束處理過程我們可以知道,rcu_gp_kthread會判斷! (rnp->qsmask) && !rcu_preempt_blocked_readers_cgp(rnp))兩個條件來決定GP是否完成,其中!rnp->qsmask代表每個CPU都經過一次quiescent state,quiescent state的定義與傳統RCU一致;!rcu_preempt_blocked_readers_cgp(rnp)這個條件就代表了rcu是否還有阻塞的行程。

已同步到看一看
贊(0)

分享創造快樂