作者 | Eli Bendersky
譯者 | qhwdw ? ? ? ? ? 共計翻譯:86 篇 貢獻時間:123 天
這是併發網路伺服器系列文章的第四部分。在這一部分中,我們將使用 libuv 再次重寫我們的伺服器,並且也會討論關於使用一個執行緒池在回呼中去處理耗時任務。最終,我們去看一下底層的 libuv,花一點時間去學習如何用非同步 API 對檔案系統阻塞操作進行封裝。
本系列的所有文章:
使用 libuv 抽象出事件驅動迴圈
在 第三節[3] 中,我們看到了基於 select
和 epoll
的伺服器的相似之處,並且,我說過,在它們之間抽象出細微的差別是件很有吸引力的事。許多庫已經做到了這些,所以在這一部分中我將去選一個並使用它。我選的這個庫是 libuv[5],它最初設計用於 Node.js 底層的可移植平臺層,並且,後來發現在其它的專案中也有使用。libuv 是用 C 寫的,因此,它具有很高的可移植性,非常適用嵌入到像 JavaScript 和 Python 這樣的高階語言中。
雖然 libuv 為了抽象出底層平臺細節已經變成了一個相當大的框架,但它仍然是以 事件迴圈 思想為中心的。在我們第三部分的事件驅動伺服器中,事件迴圈是顯式定義在 main
函式中的;當使用 libuv 時,該迴圈通常隱藏在庫自身中,而使用者程式碼僅需要註冊事件控制代碼(作為一個回呼函式)和執行這個迴圈。此外,libuv 會在給定的平臺上使用更快的事件迴圈實現,對於 Linux 它是 epoll
,等等。
libuv loop
libuv 支援多路事件迴圈,因此事件迴圈在庫中是非常重要的;它有一個控制代碼 —— uv_loop_t
,以及建立/殺死/啟動/停止迴圈的函式。也就是說,在這篇文章中,我將僅需要使用 “預設的” 迴圈,libuv 可透過 uv_default_loop()
提供它;多路迴圈大多用於多執行緒事件驅動的伺服器,這是一個更高階別的話題,我將留在這一系列文章的以後部分。
使用 libuv 的併發伺服器
為了對 libuv 有一個更深的印象,讓我們跳轉到我們的可靠協議的伺服器,它透過我們的這個系列已經有了一個強大的重新實現。這個伺服器的結構與第三部分中的基於 select
和 epoll
的伺服器有一些相似之處,因為,它也依賴回呼。完整的 示例程式碼在這裡[6];我們開始設定這個伺服器的套接字系結到一個本地埠:
int portnum = 9090;
if (argc >= 2) {
portnum = atoi(argv[1]);
}
printf("Serving on port %d\n", portnum);
int rc;
uv_tcp_t server_stream;
if ((rc = uv_tcp_init(uv_default_loop(), &server_stream)) < 0) {
die("uv_tcp_init failed: %s", uv_strerror(rc));
}
struct sockaddr_in server_address;
if ((rc = uv_ip4_addr("0.0.0.0", portnum, &server_address)) < 0) {
die("uv_ip4_addr failed: %s", uv_strerror(rc));
}
if ((rc = uv_tcp_bind(&server_stream, (const struct sockaddr*)&server_address, 0)) < 0) {
die("uv_tcp_bind failed: %s", uv_strerror(rc));
}
除了它被封裝進 libuv API 中之外,你看到的是一個相當標準的套接字。在它的傳回中,我們取得了一個可工作於任何 libuv 支援的平臺上的可移植介面。
這些程式碼也展示了很認真負責的錯誤處理;多數的 libuv 函式傳回一個整數狀態,傳回一個負數意味著出現了一個錯誤。在我們的伺服器中,我們把這些錯誤看做致命問題進行處理,但也可以設想一個更優雅的錯誤恢復。
現在,那個套接字已經系結,是時候去監聽它了。這裡我們執行首個回呼註冊:
// Listen on the socket for new peers to connect. When a new peer connects,
// the on_peer_connected callback will be invoked.
if ((rc = uv_listen((uv_stream_t*)&server_stream, N_BACKLOG, on_peer_connected)) < 0) {
die("uv_listen failed: %s", uv_strerror(rc));
}
uv_listen
註冊一個事件回呼,當新的對端連線到這個套接字時將會呼叫事件迴圈。我們的回呼在這裡被稱為 on_peer_connected
,我們一會兒將去檢視它。
最終,main
執行這個 libuv 迴圈,直到它被停止(uv_run
僅在迴圈被停止或者發生錯誤時傳回)。
// Run the libuv event loop.
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
// If uv_run returned, close the default loop before exiting.
return uv_loop_close(uv_default_loop());
註意,在執行事件迴圈之前,只有一個回呼是透過 main
註冊的;我們稍後將看到怎麼去新增更多的回呼。在事件迴圈的整個執行過程中,新增和刪除回呼並不是一個問題 —— 事實上,大多數伺服器就是這麼寫的。
這是一個 on_peer_connected
,它處理到伺服器的新的客戶端連線:
void on_peer_connected(uv_stream_t* server_stream, int status) {
if (status < 0) {
fprintf(stderr, "Peer connection error: %s\n", uv_strerror(status));
return;
}
// client will represent this peer; it's allocated on the heap and only
// released when the client disconnects. The client holds a pointer to
// peer_state_t in its data field; this peer state tracks the protocol state
// with this client throughout interaction.
uv_tcp_t* client = (uv_tcp_t*)xmalloc(sizeof(*client));
int rc;
if ((rc = uv_tcp_init(uv_default_loop(), client)) < 0) {
die("uv_tcp_init failed: %s", uv_strerror(rc));
}
client->data = NULL;
if (uv_accept(server_stream, (uv_stream_t*)client) == 0) {
struct sockaddr_storage peername;
int namelen = sizeof(peername);
if ((rc = uv_tcp_getpeername(client, (struct sockaddr*)&peername,
&namelen)) < 0) {
die("uv_tcp_getpeername failed: %s", uv_strerror(rc));
}
report_peer_connected((const struct sockaddr_in*)&peername, namelen);
// Initialize the peer state for a new client: we start by sending the peer
// the initial '*' ack.
peer_state_t* peerstate = (peer_state_t*)xmalloc(sizeof(*peerstate));
peerstate->state = INITIAL_ACK;
peerstate->sendbuf[0] = '*';
peerstate->sendbuf_end = 1;
peerstate->client = client;
client->data = peerstate;
// Enqueue the write request to send the ack; when it's done,
// on_wrote_init_ack will be called. The peer state is passed to the write
// request via the data pointer; the write request does not own this peer
// state - it's owned by the client handle.
uv_buf_t writebuf = uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t* req = (uv_write_t*)xmalloc(sizeof(*req));
req->data = peerstate;
if ((rc = uv_write(req, (uv_stream_t*)client, &writebuf, 1,
on_wrote_init_ack)) < 0) {
die("uv_write failed: %s", uv_strerror(rc));
}
} else {
uv_close((uv_handle_t*)client, on_client_closed);
}
}
這些程式碼都有很好的註釋,但是,這裡有一些重要的 libuv 語法我想去強調一下:
void* data
欄位;這些欄位可以被用於傳遞使用者資料。例如,註意 client->data
是如何指向到一個 peer_state_t
結構上,以便於 uv_write
和 uv_read_start
註冊的回呼可以知道它們正在處理的是哪個客戶端的資料。main
,其它的都執行在棧上),並且,為了避免洩漏,許多情況下都要求這些資料去安全釋放(free()
)。這些都是些需要實踐的內容 註1 。這個伺服器上對端的狀態如下:
typedef struct {
ProcessingState state;
char sendbuf[SENDBUF_SIZE];
int sendbuf_end;
uv_tcp_t* client;
} peer_state_t;
它與第三部分中的狀態非常類似;我們不再需要 sendptr
,因為,在呼叫 “done writing” 回呼之前,uv_write
將確保傳送它提供的整個緩衝。我們也為其它的回呼使用保持了一個到客戶端的指標。這裡是 on_wrote_init_ack
:
void on_wrote_init_ack(uv_write_t* req, int status) {
if (status) {
die("Write error: %s\n", uv_strerror(status));
}
peer_state_t* peerstate = (peer_state_t*)req->data;
// Flip the peer state to WAIT_FOR_MSG, and start listening for incoming data
// from this peer.
peerstate->state = WAIT_FOR_MSG;
peerstate->sendbuf_end = 0;
int rc;
if ((rc = uv_read_start((uv_stream_t*)peerstate->client, on_alloc_buffer,
on_peer_read)) < 0) {
die("uv_read_start failed: %s", uv_strerror(rc));
}
// Note: the write request doesn't own the peer state, hence we only free the
// request itself, not the state.
free(req);
}
然後,我們確信知道了這個初始的 '*'
已經被髮送到對端,我們透過呼叫 uv_read_start
去監聽從這個對端來的入站資料,它註冊一個將被事件迴圈呼叫的回呼(on_peer_read
),不論什麼時候,事件迴圈都在套接字上接收來自客戶端的呼叫:
void on_peer_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
if (nread < 0) {
if (nread != uv_eof) {
fprintf(stderr, "read error: %s\n", uv_strerror(nread));
}
uv_close((uv_handle_t*)client, on_client_closed);
} else if (nread == 0) {
// from the documentation of uv_read_cb: nread might be 0, which does not
// indicate an error or eof. this is equivalent to eagain or ewouldblock
// under read(2).
} else {
// nread > 0
assert(buf->len >= nread);
peer_state_t* peerstate = (peer_state_t*)client->data;
if (peerstate->state == initial_ack) {
// if the initial ack hasn't been sent for some reason, ignore whatever
// the client sends in.
free(buf->base);
return;
}
// run the protocol state machine.
for (int i = 0; i < nread; ++i) {
switch (peerstate->state) {
case initial_ack:
assert(0 && "can't reach here");
break;
case wait_for_msg:
if (buf->base[i] == '^') {
peerstate->state = in_msg;
}
break;
case in_msg:
if (buf->base[i] == '$') {
peerstate->state = wait_for_msg;
} else {
assert(peerstate->sendbuf_end < sendbuf_size);
peerstate->sendbuf[peerstate->sendbuf_end++] = buf->base[i] + 1;
}
break;
}
}
if (peerstate->sendbuf_end > 0) {
// we have data to send. the write buffer will point to the buffer stored
// in the peer state for this client.
uv_buf_t writebuf =
uv_buf_init(peerstate->sendbuf, peerstate->sendbuf_end);
uv_write_t* writereq = (uv_write_t*)xmalloc(sizeof(*writereq));
writereq->data = peerstate;
int rc;
if ((rc = uv_write(writereq, (uv_stream_t*)client, &writebuf, 1,
on_wrote_buf)) < 0) {
die("uv_write failed: %s", uv_strerror(rc));
}
}
}
free(buf->base);
}
這個伺服器的執行時行為非常類似於第三部分的事件驅動伺服器:所有的客戶端都在一個單個的執行緒中併發處理。並且類似的,一些特定的行為必須在伺服器程式碼中維護:伺服器的邏輯實現為一個整合的回呼,並且長週期執行是禁止的,因為它會阻塞事件迴圈。這一點也很類似。讓我們進一步探索這個問題。
在事件驅動迴圈中的長週期執行的操作
單執行緒的事件驅動程式碼使它先天就容易受到一些常見問題的影響:長週期執行的程式碼會阻塞整個迴圈。參見如下的程式:
void on_timer(uv_timer_t* timer) {
uint64_t timestamp = uv_hrtime();
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
// "Work"
if (random() % 5 == 0) {
printf("Sleeping...\n");
sleep(3);
}
}
int main(int argc, const char** argv) {
uv_timer_t timer;
uv_timer_init(uv_default_loop(), &timer);
uv_timer_start(&timer, on_timer, 0, 1000);
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
它用一個單個註冊的回呼執行一個 libuv 事件迴圈:on_timer
,它被每秒鐘迴圈呼叫一次。回呼報告一個時間戳,並且,偶爾透過睡眠 3 秒去模擬一個長週期執行。這是執行示例:
$ ./uv-timer-sleep-demo
on_timer [4840 ms]
on_timer [5842 ms]
on_timer [6843 ms]
on_timer [7844 ms]
Sleeping...
on_timer [11845 ms]
on_timer [12846 ms]
Sleeping...
on_timer [16847 ms]
on_timer [17849 ms]
on_timer [18850 ms]
...
on_timer
忠實地每秒執行一次,直到隨機出現的睡眠為止。在那個時間點,on_timer
不再被呼叫,直到睡眠時間結束;事實上,沒有其它的回呼 會在這個時間幀中被呼叫。這個睡眠呼叫阻塞了當前執行緒,它正是被呼叫的執行緒,並且也是事件迴圈使用的執行緒。當這個執行緒被阻塞後,事件迴圈也被阻塞。
這個示例演示了在事件驅動的呼叫中為什麼回呼不能被阻塞是多少的重要。並且,同樣適用於 Node.js 伺服器、客戶端側的 Javascript、大多數的 GUI 程式設計框架、以及許多其它的非同步程式設計模型。
但是,有時候執行耗時的任務是不可避免的。並不是所有任務都有一個非同步 API;例如,我們可能使用一些僅有同步 API 的庫去處理,或者,正在執行一個可能的長週期計算。我們如何用事件驅動程式設計去結合這些程式碼?執行緒可以幫到你!
“轉換” 阻塞呼叫為非同步呼叫的執行緒
一個執行緒池可以用於轉換阻塞呼叫為非同步呼叫,透過與事件迴圈並行執行,並且當任務完成時去由它去公佈事件。以阻塞函式 do_work()
為例,這裡介紹了它是怎麼執行的:
do_work()
,而是將它打包進一個 “任務”,讓執行緒池去執行這個任務。當任務完成時,我們也為迴圈去呼叫它註冊一個回呼;我們稱它為 on_work_done()
。on_work_done()
。讓我們看一下,使用 libuv 的工作排程 API,是怎麼去解決我們前面的計時器/睡眠示例中展示的問題的:
void on_after_work(uv_work_t* req, int status) {
free(req);
}
void on_work(uv_work_t* req) {
// "Work"
if (random() % 5 == 0) {
printf("Sleeping...\n");
sleep(3);
}
}
void on_timer(uv_timer_t* timer) {
uint64_t timestamp = uv_hrtime();
printf("on_timer [%" PRIu64 " ms]\n", (timestamp / 1000000) % 100000);
uv_work_t* work_req = (uv_work_t*)malloc(sizeof(*work_req));
uv_queue_work(uv_default_loop(), work_req, on_work, on_after_work);
}
int main(int argc, const char** argv) {
uv_timer_t timer;
uv_timer_init(uv_default_loop(), &timer);
uv_timer_start(&timer, on_timer, 0, 1000);
return uv_run(uv_default_loop(), UV_RUN_DEFAULT);
}
透過一個 work_req
註2 型別的控制代碼,我們進入一個任務佇列,代替在 on_timer
上直接呼叫 sleep,這個函式在任務中(on_work
)執行,並且,一旦任務完成(on_after_work
),這個函式被呼叫一次。on_work
是指 “work”(阻塞中的/耗時的操作)進行的地方。註意在這兩個回呼傳遞到 uv_queue_work
時的一個關鍵區別:on_work
執行在執行緒池中,而 on_after_work
執行在事件迴圈中的主執行緒上 —— 就好像是其它的回呼一樣。
讓我們看一下這種方式的執行:
$ ./uv-timer-work-demo
on_timer [89571 ms]
on_timer [90572 ms]
on_timer [91573 ms]
on_timer [92575 ms]
Sleeping...
on_timer [93576 ms]
on_timer [94577 ms]
Sleeping...
on_timer [95577 ms]
on_timer [96578 ms]
on_timer [97578 ms]
...
即便在 sleep 函式被呼叫時,定時器也每秒鐘滴答一下,睡眠現在執行在一個單獨的執行緒中,並且不會阻塞事件迴圈。
一個用於練習的素數測試伺服器
因為透過睡眠去模擬工作並不是件讓人興奮的事,我有一個事先準備好的更綜合的一個示例 —— 一個基於套接字接受來自客戶端的數字的伺服器,檢查這個數字是否是素數,然後去傳回一個 “prime" 或者 “composite”。完整的 伺服器程式碼在這裡[7] —— 我不在這裡貼上了,因為它太長了,更希望讀者在一些自己的練習中去體會它。
這個伺服器使用了一個原生的素數測試演演算法,因此,對於大的素數可能花很長時間才傳回一個回答。在我的機器中,對於 2305843009213693951,它花了 ~5 秒鐘去計算,但是,你的方法可能不同。
練習 1:伺服器有一個設定(透過一個名為 MODE
的環境變數)要麼在套接字回呼(意味著在主執行緒上)中執行素數測試,要麼在 libuv 工作佇列中。當多個客戶端同時連線時,使用這個設定來觀察伺服器的行為。當它計算一個大的任務時,在阻塞樣式中,伺服器將不回覆其它客戶端,而在非阻塞樣式中,它會回覆。
練習 2:libuv 有一個預設大小的執行緒池,並且執行緒池的大小可以透過環境變數配置。你可以透過使用多個客戶端去實驗找出它的預設值是多少?找到執行緒池預設值後,使用不同的設定去看一下,在重負載下怎麼去影響伺服器的響應能力。
在非阻塞檔案系統中使用工作佇列
對於只是獃板的演示和 CPU 密集型的計算來說,將可能的阻塞操作委託給一個執行緒池並不是明智的;libuv 在它的檔案系統 API 中本身就大量使用了這種能力。透過這種方式,libuv 使用一個非同步 API,以一個輕便的方式顯示出它強大的檔案系統的處理能力。
讓我們使用 uv_fs_read()
,例如,這個函式從一個檔案中(表示為一個 uv_fs_t
控制代碼)讀取一個檔案到一個緩衝中 註3,並且當讀取完成後呼叫一個回呼。換句話說,uv_fs_read()
總是立即傳回,即使是檔案在一個類似 NFS 的系統上,而資料到達緩衝區可能需要一些時間。換句話說,這個 API 與這種方式中其它的 libuv API 是非同步的。這是怎麼工作的呢?
在這一點上,我們看一下 libuv 的底層;內部實際上非常簡單,並且它是一個很好的練習。作為一個可移植的庫,libuv 對於 Windows 和 Unix 系統在它的許多函式上有不同的實現。我們去看一下在 libuv 源樹中的 src/unix/fs.c
。
這是 uv_fs_read
的程式碼:
int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
if (bufs == NULL || nbufs == 0)
return -EINVAL;
INIT(READ);
req->file = file;
req->nbufs = nbufs;
req->bufs = req->bufsml;
if (nbufs > ARRAY_SIZE(req->bufsml))
req->bufs = uv__malloc(nbufs * sizeof(*bufs));
if (req->bufs == NULL) {
if (cb != NULL)
uv__req_unregister(loop, req);
return -ENOMEM;
}
memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
req->off = off;
POST;
}
第一次看可能覺得很困難,因為它延緩真實的工作到 INIT
和 POST
宏中,以及為 POST
設定了一些本地變數。這樣做可以避免了檔案中的許多重覆程式碼。
這是 INIT
宏:
#define INIT(subtype) \
do { \
req->type = UV_FS; \
if (cb != NULL) \
uv__req_init(loop, req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->cb = cb; \
} \
while (0)
它設定了請求,並且更重要的是,設定 req->fs_type
域為真實的 FS 請求型別。因為 uv_fs_read
呼叫 INIT(READ)
,它意味著 req->fs_type
被分配一個常數 UV_FS_READ
。
這是 POST
宏:
#define POST \
do { \
if (cb != NULL) { \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
它做什麼取決於回呼是否為 NULL
。在 libuv 檔案系統 API 中,一個 NULL
回呼意味著我們真實地希望去執行一個 同步 操作。在這種情況下,POST
直接呼叫 uv__fs_work
(我們需要瞭解一下這個函式的功能),而對於一個非 NULL
回呼,它把 uv__fs_work
作為一個工作項提交到工作佇列(指的是執行緒池),然後,註冊 uv__fs_done
作為回呼;該函式執行一些登記並呼叫使用者提供的回呼。
如果我們去看 uv__fs_work
的程式碼,我們將看到它使用很多宏按照需求將工作分發到實際的檔案系統呼叫。在我們的案例中,對於 UV_FS_READ
這個呼叫將被 uv__fs_read
生成,它(最終)使用普通的 POSIX API 去讀取。這個函式可以在一個 阻塞 方式中很安全地實現。因為,它透過非同步 API 呼叫時被置於一個執行緒池中。
在 Node.js 中,fs.readFile
函式是對映到 uv_fs_read
上。因此,可以在一個非阻塞樣式中讀取檔案,甚至是當底層檔案系統 API 是阻塞方式時。
theon_wrote_buf
控制代碼中。work_req
;討論的素數測試伺服器接下來將展示怎麼被用於去傳遞背景關係資訊到回呼中。uv_fs_read()
提供了一個類似於 preadv
Linux 系統呼叫的通用 API:它使用多緩衝區用於排序,並且支援一個到檔案中的偏移。基於我們討論的目的可以忽略這些特性。via: https://eli.thegreenplace.net/2017/concurrent-servers-part-4-libuv/
作者:Eli Bendersky[9] 譯者:qhwdw 校對:wxy
本文由 LCTT 原創編譯,Linux中國 榮譽推出