最近在看 UNIX 網路程式設計並研究了一下 Redis 的實現,感覺 Redis 的原始碼十分適合閱讀和分析,其中 I/O 多路復用(mutiplexing)部分的實現非常乾凈和優雅,在這裡想對這部分的內容進行簡單的整理。
幾種 I/O 模型
為什麼 Redis 中要使用 I/O 多路復用這種技術呢?
首先,Redis 是跑在單執行緒中的,所有的操作都是按照順序線性執行的,但是由於讀寫操作等待使用者輸入或輸出都是阻塞的,所以 I/O 操作在一般情況下往往不能直接傳回,這會導致某一檔案的 I/O 阻塞導致整個行程無法對其它客戶提供服務,而 I/O 多路復用 就是為瞭解決這個問題而出現的。
Blocking I/O
先來看一下傳統的阻塞 I/O 模型到底是如何工作的:當使用 read
或者 write
對某一個 檔案描述符(File Descriptor 以下簡稱 FD) 進行讀寫時,如果當前 FD 不可讀或不可寫,整個 Redis 服務就不會對其它的操作作出響應,導致整個服務不可用。
這也就是傳統意義上的,也就是我們在程式設計中使用最多的阻塞模型:
阻塞模型雖然開發中非常常見也非常易於理解,但是由於它會影響其他 FD 對應的服務,所以在需要處理多個客戶端任務的時候,往往都不會使用阻塞模型。
I/O 多路復用
雖然還有很多其它的 I/O 模型,但是在這裡都不會具體介紹。
阻塞式的 I/O 模型並不能滿足這裡的需求,我們需要一種效率更高的 I/O 模型來支撐 Redis 的多個客戶(redis-cli),這裡涉及的就是 I/O 多路復用模型了:
在 I/O 多路復用模型中,最重要的函式呼叫就是 select
,該方法的能夠同時監控多個檔案描述符的可讀可寫情況,當其中的某些檔案描述符可讀或者可寫時, select
方法就會傳回可讀以及可寫的檔案描述符個數。
關於
select
的具體使用方法,在網路上資料很多,這裡就不過多展開介紹了;與此同時也有其它的 I/O 多路復用函式
epoll/kqueue/evport
,它們相比select
效能更優秀,同時也能支撐更多的服務。
Reactor 設計樣式
Redis 服務採用 Reactor 的方式來實現檔案事件處理器(每一個網路連線其實都對應一個檔案描述符)
檔案事件處理器使用 I/O 多路復用模組同時監聽多個 FD,當 accept
、 read
、 write
和 close
檔案事件產生時,檔案事件處理器就會回呼 FD 系結的事件處理器。
雖然整個檔案事件處理器是在單執行緒上執行的,但是透過 I/O 多路復用模組的引入,實現了同時對多個 FD 讀寫的監控,提高了網路通訊模型的效能,同時也可以保證整個 Redis 服務實現的簡單。
I/O 多路復用模組
I/O 多路復用模組封裝了底層的 select
、 epoll
、 avport
以及 kqueue
這些 I/O 多路復用函式,為上層提供了相同的介面。
在這裡我們簡單介紹 Redis 是如何包裝 select
和 epoll
的,簡要瞭解該模組的功能,整個 I/O 多路復用模組抹平了不同平臺上 I/O 多路復用函式的差異性,提供了相同的介面:
staticintaeApiCreate(aeEventLoop*eventLoop)
staticintaeApiResize(aeEventLoop*eventLoop,intsetsize)
staticvoidaeApiFree(aeEventLoop*eventLoop)
staticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmask)
staticvoidaeApiDelEvent(aeEventLoop*eventLoop,intfd,intmask)
staticintaeApiPoll(aeEventLoop*eventLoop,structtimeval*tvp)
同時,因為各個函式所需要的引數不同,我們在每一個子模組內部透過一個 aeApiState
來儲存需要的背景關係資訊:
// select
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
// epoll
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
這些背景關係資訊會儲存在 eventLoop
的 void*state
中,不會暴露到上層,只在當前子模組中使用。
封裝 select 函式
select
可以監控 FD 的可讀、可寫以及出現錯誤的情況。
在介紹 I/O 多路復用模組如何對 select
函式封裝之前,先來看一下 select
函式使用的大致流程:
int fd = /* file descriptor */
fd_set rfds;
FD_ZERO(&rfds;);
FD_SET(fd, &rfds;)
for ( ; ; ) {
select(fd+1, &rfds;, NULL, NULL, NULL);
if (FD_ISSET(fd, &rfds;)) {
/* file descriptor `fd` becomes readable */
}
}
- 初始化一個可讀的
fd_set
集合,儲存需要監控可讀性的 FD; - 使用
FD_SET
將fd
加入rfds
; - 呼叫
select
方法監控rfds
中的 FD 是否可讀; - 當
select
傳回時,檢查 FD 的狀態並完成對應的操作。
而在 Redis 的 ae_select
檔案中程式碼的組織順序也是差不多的,首先在 aeApiCreate
函式中初始化 rfds
和 wfds
:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state-;>rfds);
FD_ZERO(&state-;>wfds);
eventLoop->apidata = state;
return 0;
}
而 aeApiAddEvent
和 aeApiDelEvent
會透過 FD_SET
和 FD_CLR
修改 fd_set
中對應 FD 的標誌位:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_SET(fd,&state-;>rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state-;>wfds);
return 0;
}
整個 ae_select
子模組中最重要的函式就是 aeApiPoll
,它是實際呼叫 select
函式的部分,其作用就是在 I/O 多路復用函式傳回時,將對應的 FD 加入 aeEventLoop
的 fired
陣列中,並傳回事件的個數:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state-;>_rfds,&state-;>rfds,sizeof(fd_set));
memcpy(&state-;>_wfds,&state-;>wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state-;>_rfds,&state-;>_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop-;>events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state-;>_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state-;>_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
封裝 epoll 函式
Redis 對 epoll
的封裝其實也是類似的,使用 epoll_create
建立 epoll
中使用的 epfd
:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
在 aeApiAddEvent
中使用 epoll_ctl
向 epfd
中新增需要監控的 FD 以及監聽的事件:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,ⅇ) == -1) return -1;
return 0;
}
由於 epoll
相比 select
機制略有不同,在 epoll_wait
函式傳回時並不需要遍歷所有的 FD 檢視讀寫情況;在 epoll_wait
函式傳回時會提供一個 epoll_event
陣列:
typedef union epoll_data {
void *ptr;
int fd; /* 檔案描述符 */
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll 事件 */
epoll_data_t data;
};
其中儲存了發生的
epoll
事件(EPOLLIN
、EPOLLOUT
、EPOLLERR
和EPOLLHUP
)以及發生該事件的 FD。
aeApiPoll
函式只需要將 epoll_event
陣列中儲存的資訊加入 eventLoop
的 fired
陣列中,將資訊傳遞給上層模組:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
子模組的選擇
因為 Redis 需要在多個平臺上執行,同時為了最大化執行的效率與效能,所以會根據編譯平臺的不同選擇不同的 I/O 多路復用函式作為子模組,提供給上層統一的介面;在 Redis 中,我們透過宏定義的使用,合理的選擇不同的子模組:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
因為 select
函式是作為 POSIX 標準中的系統呼叫,在不同版本的作業系統上都會實現,所以將其作為保底方案:
Redis 會優先選擇時間複雜度為 $O(1)$ 的 I/O 多路復用函式作為底層實現,包括 Solaries 10 中的 evport
、Linux 中的 epoll
和 macOS/FreeBSD 中的 kqueue
,上述的這些函式都使用了核心內部的結構,並且能夠服務幾十萬的檔案描述符。
但是如果當前編譯環境沒有上述函式,就會選擇 select
作為備選方案,由於其在使用時會掃描全部監聽的描述符,所以其時間複雜度較差 $O(n)$,並且只能同時服務 1024 個檔案描述符,所以一般並不會以 select
作為第一方案使用。
總結
Redis 對於 I/O 多路復用模組的設計非常簡潔,透過宏保證了 I/O 多路復用模組在不同平臺上都有著優異的效能,將不同的 I/O 多路復用函式封裝成相同的 API 提供給上層使用。
整個模組使 Redis 能以單行程執行的同時服務成千上萬個檔案描述符,避免了由於多行程應用的引入導致程式碼實現複雜度的提升,減少了出錯的可能性。
朋友會在“發現-看一看”看到你“在看”的內容