點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
上一章節中,我們分析了Netty服務的啟動過程,本章節分析Netty的NioEventLoop是如工作的。
NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務:
I/O任務
即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。
非IO任務
新增到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。
兩種任務的執行時間比由變數ioRatio控制,預設為50,則表示允許非IO任務執行的時間與IO任務的執行時間相等。
NioEventLoop.run 方法實現
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
hasTasks()方法判斷當前taskQueue是否有元素。
1、 如果taskQueue中有元素,執行 selectNow() 方法,最終執行selector.selectNow(),該方法會立即傳回。
void selectNow() throws IOException {
try {
selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
2、 如果taskQueue沒有元素,執行 select(oldWakenUp) 方法,程式碼如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
這個方法解決了Nio中臭名昭著的bug:selector的select方法導致cpu100%。
1、delayNanos(currentTimeNanos):計算延遲任務佇列中第一個任務的到期執行時間(即最晚還能延遲多長時間執行),預設傳回1s。每個SingleThreadEventExecutor都持有一個延遲執行任務的優先佇列PriorityQueue,啟動執行緒時,往佇列中加入一個任務。
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return delayedTask.delayNanos(currentTimeNanos);
}
//ScheduledFutureTask
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
public long deadlineNanos() {
return deadlineNanos;
}
2、如果延遲任務佇列中第一個任務的最晚還能延遲執行的時間小於500000納秒,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執行次數和標識是否執行過selector.selectNow()),則執行selector.selectNow()方法並立即傳回。
3、否則執行selector.select(timeoutMillis),這個方法已經在深入淺出NIO Socket分析過。
4、如果已經存在ready的selectionKey,或者selector被喚醒,或者taskQueue不為空,或則scheduledTaskQueue不為空,則退出迴圈。
5、如果 selectCnt 沒達到閾值SELECTOR_AUTO_REBUILD_THRESHOLD(預設512),則繼續進行for迴圈。其中 currentTimeNanos 在select操作之後會重新賦值當前時間,如果selector.select(timeoutMillis)行為真的阻塞了timeoutMillis,第二次的timeoutMillis肯定等於0,此時selectCnt 為1,所以會直接退出for迴圈。
6、如果觸發了epool cpu100%的bug,會發生什麼?
selector.select(timeoutMillis)操作會立即傳回,不會阻塞timeoutMillis,導致 currentTimeNanos 幾乎不變,這種情況下,會反覆執行selector.select(timeoutMillis),變數selectCnt 會逐漸變大,當selectCnt 達到閾值,則執行rebuildSelector方法,進行selector重建,解決cpu佔用100%的bug。
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
rebuildSelector過程:
1、透過方法openSelector建立一個新的selector。
2、將old selector的selectionKey執行cancel。
3、將old selector的channel重新註冊到新的selector中。
對selector進行rebuild後,需要重新執行方法selectNow,檢查是否有已ready的selectionKey。
方法selectNow()或select(oldWakenUp)傳回後,執行方法processSelectedKeys和runAllTasks。
1、processSelectedKeys 用來處理有事件發生的selectkey,這裡對最佳化過的方法processSelectedKeysOptimized進行分析:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
在最佳化過的方法中,有事件發生的selectkey存放在陣列selectedKeys中,透過遍歷selectedKeys,處理每一個selectkey,具體處理過程,會在後續進行分析。
2、runAllTasks 處理非I/O任務。
如果 ioRatio 不為100時,方法runAllTasks的執行時間只能為ioTime * (100 – ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的執行時間。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
方法fetchFromScheduledTaskQueue把scheduledTaskQueue中已經超過延遲執行時間的任務移到taskQueue中等待被執行。
private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}
依次從taskQueue任務task執行,每執行64個任務,進行耗時檢查,如果已執行時間超過預先設定的執行時間,則停止執行非IO任務,避免非IO任務太多,影響IO任務的執