(點選上方公眾號,可快速關註)
來源:Sam哥哥 ,
blog.csdn.net/linsongbin1/article/details/78275283
概述
在JAVA的世界裡,如果想並行的執行一些任務,可以使用ThreadPoolExecutor。
大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場景下,比如瞬時大流量的,為了提高響應和吞吐量,最好還是擴充套件一下ThreadPoolExecutor。
全宇宙的JAVA IT人士應該都知道ThreadPoolExecutor的執行流程:
-
core執行緒還能應付的,則不斷的建立新的執行緒;
-
core執行緒無法應付,則將任務扔到佇列裡面;
-
佇列滿了(意味著插入任務失敗),則開始建立MAX執行緒,執行緒數達到MAX後,佇列還一直是滿的,則丟擲RejectedExecutionException.
這個執行流程有個小問題,就是當core執行緒無法應付請求的時候,會立刻將任務新增到佇列中,如果佇列非常長,而任務又非常多,那麼將會有頻繁的任務入佇列和任務出佇列的操作。
根據實際的壓測發現,這種操作也是有一定消耗的。其實JAVA提供的SynchronousQueue佇列是一個零長度的佇列,任務都是直接由生產者遞交給消費者,中間沒有入佇列的過程,可見JAVA API的設計者也是有考慮過入佇列這種操作的開銷。
另外,任務一多,立刻扔到佇列裡,而MAX執行緒又不幹活,如果佇列裡面太多工了,只有可憐的core執行緒在忙,也是會影響效能的。
當core執行緒無法應付請求的時候,能不能延後入佇列這個操作呢? 讓MAX執行緒儘快啟動起來,幫忙處理任務。
也即是說,當core執行緒無法應付請求的時候,如果當前執行緒池中的執行緒數量還小於MAX執行緒數的時候,繼續建立新的執行緒處理任務,一直到執行緒數量到達MAX後,才將任務插入到佇列裡。
我們透過改寫佇列的offer方法來實現這個標的。
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果執行緒池裡的執行緒數量已經到達最大,將任務新增到佇列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說明有空閑的執行緒,這個時候無需建立core執行緒之外的執行緒,而是把任務直接丟到佇列裡即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果執行緒池裡的執行緒數量還沒有到達最大,直接建立執行緒,而不是把任務丟到佇列裡面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
註意其中的
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
是表示core執行緒仍然能處理的來,同時又有空閑執行緒的情況,將任務插入到佇列中。 如何判斷執行緒池中有空閑執行緒呢? 可以使用一個計數器來實現,每當execute方法被執行的時候,計算器加1,當afterExecute被執行後,計數器減1.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
//程式碼未完整,待補充。。。。。
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
這樣,當
executor.getSubmittedTaskCount() < currentPoolThreadSize
的時候,說明有空閑執行緒。
完整程式碼
EnhancedThreadPoolExecutor類
package executer;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* 計數器,用於表示已經提交到佇列裡面的task的數量,這裡task特指還未完成的task。
* 當task執行完後,submittedTaskCount會減1的。
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
workQueue.setExecutor(this);
}
/**
* 改寫父類的afterExecute方法,當task執行完成後,將計數器減1
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
/**
* 改寫父類的execute方法,在任務開始執行之前,計數器加1。
*/
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//當發生RejectedExecutionException,嘗試再次將task丟到佇列裡面,如果還是發生RejectedExecutionException,則直接丟擲異常。
BlockingQueue
taskQueue = super.getQueue(); if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(“佇列已滿”);
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
}
TaskQueue
package executer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
public class TaskQueue extends LinkedBlockingQueue
{ private EnhancedThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EnhancedThreadPoolExecutor exec) {
executor = exec;
}
public boolean forceTaskIntoQueue(Runnable o) {
if (executor.isShutdown()) {
throw new RejectedExecutionException(“Executor已經關閉了,不能將task新增到佇列裡面”);
}
return super.offer(o);
}
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果執行緒池裡的執行緒數量已經到達最大,將任務新增到佇列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說明有空閑的執行緒,這個時候無需建立core執行緒之外的執行緒,而是把任務直接丟到佇列裡即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果執行緒池裡的執行緒數量還沒有到達最大,直接建立執行緒,而不是把任務丟到佇列裡面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
}
TestExecuter
package executer;
import java.util.concurrent.TimeUnit;
public class TestExecuter {
private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 30;
private static final int QUEUE_SIZE = 5;
static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));
public static void main(String[] args){
for (int i = 0; i < 15; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println(“執行緒池中現在的執行緒數目是:”+executor.getPoolSize()+”, 佇列中正在等待執行的任務數量為:”+ executor.getQueue().size());
}
}
}
先執行一下程式碼,看看是否如何預期。直接執行TestExecuter類中的main方法,執行結果如下:
執行緒池中現在的執行緒數目是:1, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:2, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:3, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:4, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:6, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:7, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:8, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:9, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:1
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:2
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:3
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:4
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:5
可以看到當執行緒數增加到core數量的時候,佇列中是沒有任務的。一直到執行緒數量增加到MAX數量,也即是10的時候,佇列中才開始有任務。符合我們的預期。
如果我們註釋掉TaskQueue類中的offer方法,也即是不改寫佇列的offer方法,那麼執行結果如下:
執行緒池中現在的執行緒數目是:1, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:2, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:3, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:4, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:0
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:1
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:2
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:3
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:4
執行緒池中現在的執行緒數目是:5, 佇列中正在等待執行的任務數量為:5
執行緒池中現在的執行緒數目是:6, 佇列中正在等待執行的任務數量為:5
執行緒池中現在的執行緒數目是:7, 佇列中正在等待執行的任務數量為:5
執行緒池中現在的執行緒數目是:8, 佇列中正在等待執行的任務數量為:5
執行緒池中現在的執行緒數目是:9, 佇列中正在等待執行的任務數量為:5
執行緒池中現在的執行緒數目是:10, 佇列中正在等待執行的任務數量為:5
可以看到當執行緒數增加到core數量的時候,佇列中已經有任務了。
進一步思考
在使用ThreadPoolExecutor的時候,如果發生了RejectedExecutionException,該如何處理?本文中的程式碼是採用了重新將任務嘗試插入到佇列中,如果還是失敗則直接將reject異常丟擲去。
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//當發生RejectedExecutionException,嘗試再次將task丟到佇列裡面,如果還是發生RejectedExecutionException,則直接丟擲異常。
BlockingQueue
taskQueue = super.getQueue(); if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(“佇列已滿”);
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue類提供了forceTaskIntoQueue方法,將任務插入到佇列中。
還有另一種解決方案,就是使用另外一個執行緒池來執行任務,當第一個執行緒池丟擲Reject異常時,catch住它,並使用第二個執行緒池處理任務。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能