歡迎光臨
每天分享高質量文章

JDK原始碼閱讀:InterruptibleChannel 與可中斷 IO

(點選上方公眾號,可快速關註)


來源:木杉的部落格 ,

imushan.com/2018/08/01/java/language/JDK原始碼閱讀-InterruptibleChannel與可中斷IO/

Java傳統IO是不支援中斷的,所以如果程式碼在read/write等操作阻塞的話,是無法被中斷的。這就無法和Thead的interrupt模型配合使用了。JavaNIO眾多的升級點中就包含了IO操作對中斷的支援。InterruptiableChannel表示支援中斷的Channel。我們常用的FileChannel,SocketChannel,DatagramChannel都實現了這個介面。

InterruptibleChannel介面

public interface InterruptibleChannel extends Channel

{

 

    /**

     * 關閉當前Channel

     *     

     * 任何當前阻塞在當前channel執行的IO操作上的執行緒,都會收到一個AsynchronousCloseException異常

     */

    public void close() throws IOException;

}

InterruptibleChannel介面沒有定義任何方法,其中的close方法是父介面就有的,這裡只是添加了額外的註釋。

AbstractInterruptibleChannel實現了InterruptibleChannel介面,並提供了實現可中斷IO機制的重要的方法,比如begin(),end()。

在解讀這些方法的程式碼前,先瞭解一下NIO中,支援中斷的Channel程式碼是如何編寫的。

第一個要求是要正確使用begin()和end()方法:

boolean completed = false;

try {

    begin();

    completed = …;    // 執行阻塞IO操作

    return …;         // 傳回結果

} finally {

    end(completed);

}

NIO規定了,在阻塞IO的陳述句前後,需要呼叫begin()和end()方法,為了保證end()方法一定被呼叫,要求放在finally陳述句塊中。

第二個要求是Channel需要實現java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個方法。AbstractInterruptibleChannel在處理中斷時,會呼叫這個方法,使用Channel的具體實現來關閉Channel。

接下來我們具體看一下begin()和end()方法是如何實現的。

begin方法

// 儲存中斷處理物件實體

private Interruptible interruptor;

// 儲存被中斷執行緒實體

private volatile Thread interrupted;

 

protected final void begin() {

    // 初始化中斷處理物件,中斷處理物件提供了中斷處理回呼

    // 中斷處理回呼登記被中斷的執行緒,然後呼叫implCloseChannel方法,關閉Channel

    if (interruptor == null) {

        interruptor = new Interruptible() {

            public void interrupt(Thread target) {

                synchronized (closeLock) {

                    // 如果當前Channel已經關閉,則直接傳回

                    if (!open)

                        return;

 

                    // 設定標誌位,同時登記被中斷的執行緒

                    open = false;

                    interrupted = target;

                    try {

                        // 呼叫具體的Channel實現關閉Channel

                        AbstractInterruptibleChannel.this.implCloseChannel();

                    } catch (IOException x) { }

                }

            }};

    }

    // 登記中斷處理物件到當前執行緒

    blockedOn(interruptor);

 

    // 判斷當前執行緒是否已經被中斷,如果已經被中斷,可能登記的中斷處理物件沒有被執行,這裡手動執行一下

    Thread me = Thread.currentThread();

    if (me.isInterrupted())

        interruptor.interrupt(me);

}

從begin()方法中,我們可以看出NIO實現可中斷IO操作的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理物件,這樣Thread物件在被中斷時,會執行中斷處理物件中的回呼,這個回呼中,執行關閉Channel的操作。這樣就實現了Channel對執行緒中斷的響應了。

接下來重點就是研究“Thread新增中斷處理邏輯”這個機制是如何實現的了,是透過blockedOn方法實現的:

static void blockedOn(Interruptible intr) {         // package-private

    sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);

}

blockedOn方法使用的是JavaLangAccess的blockedOn方法。

SharedSecrets是一個神奇而糟糕的類,為啥說是糟糕呢,因為這個方法的存在,就是為了訪問JDK類庫中一些因為類作用域限制而外部無法訪問的類或者方法。JDK很多類與方法是私有或者包級別私有的,外部是無法訪問的,但是JDK在本身實現的時候又存在互相依賴的情況,所以為了外部可以不依賴反射訪問這些類或者方法,在sun包下,存在這麼一個類,提供了各種超越限制的方法。

SharedSecrets.getJavaLangAccess()方法傳回JavaLangAccess物件。JavaLangAccess物件就和名稱所說的一樣,提供了java.lang包下一些非公開的方法的訪問。這個類在System初始化時被構造:

// java.lang.System#setJavaLangAccess

private static void setJavaLangAccess() {

    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){

        public void blockedOn(Thread t, Interruptible b) {

            t.blockedOn(b);

        }

        //…

    });

}

可以看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個包級別私有的方法:

/* The object in which this thread is blocked in an interruptible I/O

 * operation, if any.  The blocker’s interrupt method should be invoked

 * after setting this thread’s interrupt status.

 */

private volatile Interruptible blocker;

private final Object blockerLock = new Object();

 

/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code

 */

void blockedOn(Interruptible b) {

    // 序列化blocker相關操作

    synchronized (blockerLock) {

        blocker = b;

    }

}

而這個方法也非常簡單,就是設定java.lang.Thread#blocker變數為之前提到的中斷處理物件。而且從註釋中可以看出,這個方法就是專門為NIO設計的,註釋都非常直白的提到了,NIO的程式碼會透過sun.misc.SharedSecrets呼叫到這個方法。。

接下來就是重頭戲了,看一下Thread在中斷時,如何呼叫NIO註冊的中斷處理器:

public void interrupt() {

    if (this != Thread.currentThread())

        checkAccess();

 

    synchronized (blockerLock) {

        Interruptible b = blocker;

 

        // 如果NIO設定了中斷處理器,則只需Thread本身的中斷邏輯後,呼叫中斷處理器的回呼函式

        if (b != null) {

            interrupt0();           // 這一步會設定interrupt標誌位

            b.interrupt(this);

            return;

        }

    }

 

    // 如果沒有的話,就走普通流程

    interrupt0();

}

end方法

begin()方法負責新增Channel的中斷處理器到當前執行緒。end()是在IO操作執行完/中斷完後的操作,負責判斷中斷是否發生,如果發生判斷是當前執行緒發生還是別的執行緒中斷把當前操作的Channel給關閉了,對於不同的情況,丟擲不同的異常。

protected final void end(boolean completed) throws AsynchronousCloseException

{

    // 清空執行緒的中斷處理器取用,避免執行緒一直存活導致中斷處理器無法被回收

    blockedOn(null);

    Thread interrupted = this.interrupted;

 

    if (interrupted != null && interrupted == Thread.currentThread()) {

        interrupted = null;

        throw new ClosedByInterruptException();

    }

    // 如果這次沒有讀取到資料,並且Channel被另外一個執行緒關閉了,則排除Channel被非同步關閉的異常

    // 但是如果這次讀取到了資料,就不能丟擲異常,因為這次讀取的資料是有效的,需要傳回給使用者的(重要邏輯)

    if (!completed && !open)

        throw new AsynchronousCloseException();

}

透過程式碼可以看出,如果是當前執行緒被中斷,則丟擲ClosedByInterruptException異常,表示Channel因為執行緒中斷而被關閉了,IO操作也隨之中斷了。

如果是當前執行緒發現Channel被關閉了,並且是讀取還未執行完畢的情況,則丟擲AsynchronousCloseException異常,表示Channel被非同步關閉了。

end()邏輯的活動圖如下:

場景分析

併發的場景分析起來就是複雜,上面的程式碼不多,但是場景很多,我們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)為例分析一下可能的場景:

  1. A執行緒read,B執行緒中斷A執行緒:A執行緒丟擲ClosedByInterruptException異常

  2. A,B執行緒read,C執行緒中斷A執行緒

  • A被中斷時,B剛剛進入read方法:A執行緒丟擲ClosedByInterruptException異常,B執行緒ensureOpen方法丟擲ClosedChannelException異常

  • A被中斷時,B阻塞在底層read方法中:A執行緒丟擲ClosedByInterruptException異常,B執行緒底層方法丟擲異常傳回,end方法中丟擲AsynchronousCloseException異常

  • A被中斷時,B已經讀取到資料:A執行緒丟擲ClosedByInterruptException異常,B執行緒正常傳回

sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)程式碼如下:

public int read(ByteBuffer dst) throws IOException {

    ensureOpen();  // 1

    if (!readable) // 2

        throw new NonReadableChannelException();

    synchronized (positionLock) {

        int n = 0;

        int ti = -1;

        try {            

            begin();

            ti = threads.add();

            if (!isOpen())

                return 0; // 3

            do {

                n = IOUtil.read(fd, dst, -1, nd); // 4

            } while ((n == IOStatus.INTERRUPTED) && isOpen());

            return IOStatus.normalize(n);

        } finally {

            threads.remove(ti);

            end(n > 0);

            assert IOStatus.check(n);

        }

    }

}

總結

在JavaIO時期,人們為了中斷IO操作想了不少方法,核心操作就是關閉流,促使IO操作丟擲異常,達到中斷IO的效果。NIO中,將這個操作植入了java.lang.Thread#interrupt方法,免去使用者自己編碼特定程式碼的麻煩。使IO操作可以像其他可中斷方法一樣,在中斷時丟擲ClosedByInterruptException異常,業務程式捕獲該異常即可對IO中斷做出響應。

參考資料

  • java – What does JavaLangAccess.blockedOn(Thread t, Interruptible b) do? – Stack Overflow 

    https://stackoverflow.com/questions/8544891/what-does-javalangaccess-blockedonthread-t-interruptible-b-do

  • Java NIO 那些躲在角落的細節

    https://www.oschina.net/question/138146_26027

【關於投稿】


如果大家有原創好文投稿,請直接給公號傳送留言。


① 留言格式:
【投稿】+《 文章標題》+ 文章連結

② 示例:
【投稿】《不要自稱是程式員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/

③ 最後請附上您的個人簡介哈~



看完本文有收穫?請轉發分享給更多人

關註「ImportNew」,提升Java技能

贊(0)

分享創造快樂