最近我們組楊青同學遇到一個使用執行緒池不當的問題:非同步處理的執行緒池執行緒將主執行緒hang住了,分析程式碼發現是執行緒池的拒絕策略設定得不合理,設定為CallerRunsPolicy。當非同步執行緒的執行效率降低時,阻塞佇列滿了,觸發了拒絕策略,進而導致主執行緒hang死。
從這個問題中,我們學到了兩點:
-
執行緒池的使用,需要充分分析業務場景後作出選擇,必要的情況下需要自定義執行緒池;
-
執行緒池的執行狀況,也需要監控
關於執行緒池的監控,我參考了《Java程式設計的藝術》中提供的思路實現的,分享下我的程式碼片段,如下:
public class AsyncThreadExecutor implements AutoCloseable {
private static final int DEFAULT_QUEUE_SIZE = 1000;
private static final int DEFAULT_POOL_SIZE = 10;
@Setter
private int queueSize = DEFAULT_QUEUE_SIZE;
@Setter
private int poolSize = DEFAULT_POOL_SIZE;
/**
* 用於週期性監控執行緒池的執行狀態
*/
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
/**
* 自定義非同步執行緒池
* (1)任務佇列使用有界佇列
* (2)自定義拒絕策略
*/
private final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),
new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),
(r, executor) -> log.error("the async executor pool is full!!"));
private final ExecutorService executorService = threadPoolExecutor;
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
/**
* 執行緒池需要執行的任務數
*/
long taskCount = threadPoolExecutor.getTaskCount();
/**
* 執行緒池在執行過程中已完成的任務數
*/
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
/**
* 曾經建立過的最大執行緒數
*/
long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
/**
* 執行緒池裡的執行緒數量
*/
long poolSize = threadPoolExecutor.getPoolSize();
/**
* 執行緒池裡活躍的執行緒數量
*/
long activeCount = threadPoolExecutor.getActiveCount();
log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",
taskCount, completedTaskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
}, 0, 10, TimeUnit.MINUTES);
}
public void execute(Runnable task) {
executorService.execute(task);
}
@Override
public void close() throws Exception {
executorService.shutdown();
}
}
這裡的主要思路是:(1)使用有界佇列的固定數量執行緒池;(2)拒絕策略是將任務丟棄,但是需要記錄錯誤日誌;(3)使用一個排程執行緒池對業務執行緒池進行監控。
在檢視監控日誌的時候,看到下圖所示的監控日誌:
這裡我對largestPooSize的含義比較困惑,按字面理解是“最大的執行緒池數量”,但是按照執行緒池的定義,maximumPoolSize和coreSize相同的時候(在這裡,都是10),一個執行緒池裡的最大執行緒數是10,那麼為什麼largestPooSize可以是39呢?我去翻這塊的原始碼:
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
註釋的翻譯是:傳回在這個執行緒池裡曾經同時存在過的執行緒數。再看這個變數largestPoolSize在ThreadExecutor中的賦值的地方,程式碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//這裡這裡!
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
發現largestPoolSize是worker集合的大小,但是註意,並不是worker集合中的所有worker都處於工作狀態。因此這裡結論就出來了:執行緒池的容量,值得是同時活躍(執行)的執行緒池個數;largestPoolSize的大小是執行緒池曾建立的執行緒個數,跟執行緒池的容量無關。
PS:楊青同學是這篇文章的靈感來源,他做了很多壓測。給了我很多思路,並跟我一起分析了一些程式碼。