點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
摘要: 原創出處 https://mp.weixin.qq.com/s/_ghOnuwbLHOkqGKgzWdLVw 「渣渣王子」歡迎轉載,保留摘要,謝謝!
-
1. 概念
-
2. 原始碼分析
-
2.1.1 HikariConfig
-
2.1.2 HouseKeeper
-
2.1.3 小結
-
2.2.1 getConnection
-
2.2.2 leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask
-
2.2.3 close
-
3. 測試模擬
-
4. Spark/Scala連線池洩漏問題排查
-
5. 參考資料
-
6. 系列文章
音樂資源載入中…
概念
此屬性控制在記錄訊息之前連線可能離開池的時間量,單位毫秒,預設為0,表明可能存在連線洩漏。
如果大於0且不是單元測試,則進一步判斷:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),會被重置為0。即如果要生效則必須>0,而且不能小於2秒,而且當maxLifetime > 0時不能大於maxLifetime(預設值1800000毫秒=30分鐘)。
leakDetectionThreshold
This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a possible connection leak. A value of 0 means leak detection is disabled. Lowest acceptable value for enabling leak detection is 2000 (2 seconds). Default: 0
更多配置大綱詳見文章 【追光者系列】HikariCP預設配置
原始碼解析
我們首先來看一下leakDetectionThreshold用在了哪裡的綱要圖:
Write
還記得上一篇文章【追光者系列】HikariCP原始碼分析之從validationTimeout來講講Hikari 2.7.5版本的那些故事提到:我們可以看到在兩處看到validationTimeout的寫入,一處是PoolBase建構式,另一處是HouseKeeper執行緒。
leakDetectionThreshold的用法可以說是異曲同工,除了建構式之外,也用了HouseKeeper執行緒去處理。
HikariConfig
在com.zaxxer.hikari.HikariConfig中進行了leakDetectionThreshold初始化工作,
@Override
public void setLeakDetectionThreshold(long leakDetectionThresholdMs)
{
this.leakDetectionThreshold = leakDetectionThresholdMs;
}
validateNumerics方法中則是解釋了上文及官方檔案中該值validate的策略
if (leakDetectionThreshold > 0 && !unitTest) {
if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) {
LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName);
leakDetectionThreshold = 0;
}
}
該方法會被HikariConfig#validate所呼叫,而HikariConfig#validate會在HikariDataSource的specified configuration的建構式使用到
/**
* Construct a HikariDataSource with the specified configuration. The
* {@link HikariConfig} is copied and the pool is started by invoking this
* constructor.
*
* The {@link HikariConfig} can be modified without affecting the HikariDataSource
* and used to initialize another HikariDataSource instance.
*
* @param configuration a HikariConfig instance
*/
public HikariDataSource(HikariConfig configuration)
{
configuration.validate();
configuration.copyStateTo(this);
LOGGER.info("{} - Starting...", configuration.getPoolName());
pool = fastPathPool = new HikariPool(this);
LOGGER.info("{} - Start completed.", configuration.getPoolName());
this.seal();
}
也在每次getConnection的時候用到了,
// ***********************************************************************
// DataSource methods
// ***********************************************************************
/** {@inheritDoc} */
@Override
public Connection getConnection() throws SQLException
{
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}
if (fastPathPool != null) {
return fastPathPool.getConnection();
}
// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}
return result.getConnection();
}
這裡要特別提一下一個很牛逼的Double-checked_locking的實現,大家可以看一下這篇文章 https://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
// Works with acquire/release semantics for volatile in Java 1.5 and later
// Broken under Java 1.4 and earlier semantics for volatile
class Foo {
private volatile Helper helper;
public Helper getHelper() {
Helper localRef = helper;
if (localRef == null) {
synchronized(this) {
localRef = helper;
if (localRef == null) {
helper = localRef = new Helper();
}
}
}
return localRef;
}
// other functions and members...
}
HouseKeeper
我們再來看一下com.zaxxer.hikari.pool.HikariPool這個程式碼,該執行緒嘗試在池中維護的最小空閑連線數,並不斷掃清的透過MBean調整的connectionTimeout和validationTimeout等值,leakDetectionThreshold這個值也是透過這個HouseKeeper的leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold())去管理的。
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
private final class HouseKeeper implements Runnable
{
private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);
@Override
public void run()
{
try {
// refresh timeouts in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
fillPool();
return;
}
else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
final List notInUse = connectionBag.values(STATE_NOT_IN_USE);
int removed = 0;
for (PoolEntry entry : notInUse) {
if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
if (++removed > config.getMinimumIdle()) {
break;
}
}
}
}
logPoolState(afterPrefix);
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
LOGGER.error("Unexpected exception in housekeeping task", e);
}
}
}
這裡補充說一下這個HouseKeeper,它是在com.zaxxer.hikari.pool.HikariPool的建構式中初始化的:this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
/**
* Create/initialize the Housekeeping service {@link ScheduledExecutorService}. If the user specified an Executor
* to be used in the {@link HikariConfig}, then we use that. If no Executor was specified (typical), then create
* an Executor and configure it.
*
* @return either the user specified {@link ScheduledExecutorService}, or the one we created
*/
private ScheduledExecutorService initializeHouseKeepingExecutorService()
{
if (config.getScheduledExecutor() == null) {
final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true));
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
else {
return config.getScheduledExecutor();
}
}
這裡簡要說明一下,ScheduledThreadPoolExecutor是ThreadPoolExecutor類的子類,因為繼承了ThreadPoolExecutor類所有的特性。但是,Java推薦僅在開發定時任務程式時採用ScheduledThreadPoolExecutor類。
在呼叫shutdown()方法而仍有待處理的任務需要執行時,可以配置ScheduledThreadPoolExecutor的行為。預設的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,透過呼叫ScheduledThreadPoolExecutor類的setExecuteExistingDelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false引數給這個方法,執行shutdown()方法之後,待處理的任務將不會被執行。
取消任務後,判斷是否需要從阻塞佇列中移除任務。其中removeOnCancel引數透過setRemoveOnCancelPolicy()設定。之所以要在取消任務後移除阻塞佇列中任務,是為了防止佇列中積壓大量已被取消的任務。
從這兩個引數配置大家可以瞭解到作者的對於HouseKeeper的配置初衷。
小結
Hikari透過建構式和HouseKeeper對於一些配置引數進行初始化及動態賦值,動態賦值依賴於HikariConfigMXbean以及使用任務排程執行緒池ScheduledThreadPoolExecutor來不斷掃清配置的。
我們僅僅以com.zaxxer.hikari.HikariConfig來做下小結,允許在執行時進行動態修改的主要有:
// Properties changeable at runtime through the HikariConfigMXBean
//
private volatile long connectionTimeout;
private volatile long validationTimeout;
private volatile long idleTimeout;
private volatile long leakDetectionThreshold;
private volatile long maxLifetime;
private volatile int maxPoolSize;
private volatile int minIdle;
private volatile String username;
private volatile String password;
不允許在執行時進行改變的主要有
// Properties NOT changeable at runtime
//
private long initializationFailTimeout;
private String catalog;
private String connectionInitSql;
private String connectionTestQuery;
private String dataSourceClassName;
private String dataSourceJndiName;
private String driverClassName;
private String jdbcUrl;
private String poolName;
private String schema;
private String transactionIsolationName;
private boolean isAutoCommit;
private boolean isReadOnly;
private boolean isIsolateInternalQueries;
private boolean isRegisterMbeans;
private boolean isAllowPoolSuspension;
private DataSource dataSource;
private Properties dataSourceProperties;
private ThreadFactory threadFactory;
private ScheduledExecutorService scheduledExecutor;
private MetricsTrackerFactory metricsTrackerFactory;
private Object metricRegistry;
private Object healthCheckRegistry;
private Properties healthCheckProperties;
Read
getConnection
在com.zaxxer.hikari.pool.HikariPool的核心方法getConnection傳回的時候呼叫了poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now)
註意,建立代理連線的時候關聯了ProxyLeakTask。
連線洩漏檢測的原理就是:連線有借有還,hikari是每借用一個connection則會建立一個延時的定時任務,在歸還或者出異常的或者使用者手動呼叫evictConnection的時候cancel掉這個task
/**
* Get a connection from the pool, or timeout after the specified number of milliseconds.
*
* @param hardTimeout the maximum time to wait for a connection from the pool
* @return a java.sql.Connection instance
* @throws SQLException thrown if a timeout occurs trying to obtain a connection
*/
public Connection getConnection(final long hardTimeout) throws SQLException
{
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask
在HikariPool建構式裡,初始化了leakTaskFactory,以及houseKeepingExecutorService。
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);
com.zaxxer.hikari.pool.ProxyLeakTaskFactory是作者慣用的設計,我們看一下原始碼:
/**
* A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks.
*
* @author Brett Wooldridge
* @author Andreas Brenk
*/
class ProxyLeakTaskFactory
{
private ScheduledExecutorService executorService;
private long leakDetectionThreshold;
ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService)
{
this.executorService = executorService;
this.leakDetectionThreshold = leakDetectionThreshold;
}
ProxyLeakTask schedule(final PoolEntry poolEntry)
{
return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
}
void updateLeakDetectionThreshold(final long leakDetectionThreshold)
{
this.leakDetectionThreshold = leakDetectionThreshold;
}
private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
ProxyLeakTask task = new ProxyLeakTask(poolEntry);
task.schedule(executorService, leakDetectionThreshold);
return task;
}
}
如果leakDetectionThreshold=0,即禁用連線洩露檢測,schedule傳回的是ProxyLeakTask.NO_LEAK,否則則新建一個ProxyLeakTask,在leakDetectionThreshold時間後觸發
再看一下com.zaxxer.hikari.pool.ProxyLeakTask的原始碼
/**
* A Runnable that is scheduled in the future to report leaks. The ScheduledFuture is
* cancelled if the connection is closed before the leak time expires.
*
* @author Brett Wooldridge
*/
class ProxyLeakTask implements Runnable
{
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
static final ProxyLeakTask NO_LEAK;
private ScheduledFuture> scheduledFuture;
private String connectionName;
private Exception exception;
private String threadName;
private boolean isLeaked;
static
{
NO_LEAK = new ProxyLeakTask() {
@Override
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}
@Override
public void run() {}
@Override
public void cancel() {}
};
}
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
private ProxyLeakTask()
{
}
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold)
{
scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
public void run()
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
void cancel()
{
scheduledFuture.cancel(false);
if (isLeaked) {
LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
}
}
}
NO_LEAK類裡頭的方法都是空操作
一旦該task被觸發,則丟擲Exception(“Apparent connection leak detected”)
我們想起了什麼,是不是想起了【追光者系列】HikariCP原始碼分析之allowPoolSuspension那篇文章裡有著一摸一樣的設計?
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
isAllowPoolSuspension預設值是false的,建構式直接會建立SuspendResumeLock.FAUX_LOCK;只有isAllowPoolSuspension為true時,才會真正建立SuspendResumeLock。
com.zaxxer.hikari.util.SuspendResumeLock內部實現了一虛一實兩個java.util.concurrent.Semaphore
/**
* This class implements a lock that can be used to suspend and resume the pool. It
* also provides a faux implementation that is used when the feature is disabled that
* hopefully gets fully "optimized away" by the JIT.
*
* @author Brett Wooldridge
*/
public class SuspendResumeLock
{
public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
@Override
public void acquire() {}
@Override
public void release() {}
@Override
public void suspend() {}
@Override
public void resume() {}
};
private static final int MAX_PERMITS = 10000;
private final Semaphore acquisitionSemaphore;
/**
* Default constructor
*/
public SuspendResumeLock()
{
this(true);
}
private SuspendResumeLock(final boolean createSemaphore)
{
acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
}
public void acquire()
{
acquisitionSemaphore.acquireUninterruptibly();
}
public void release()
{
acquisitionSemaphore.release();
}
public void suspend()
{
acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
}
public void resume()
{
acquisitionSemaphore.release(MAX_PERMITS);
}
}
由於Hikari的isAllowPoolSuspension預設值是false的,FAUX_LOCK只是一個空方法,acquisitionSemaphore物件也是空的;如果isAllowPoolSuspension值調整為true,當收到MBean的suspend呼叫時將會一次性acquisitionSemaphore.acquireUninterruptibly從此訊號量獲取給定數目MAX_PERMITS 10000的許可,在提供這些許可前一直將執行緒阻塞。之後HikariPool的getConnection方法獲取不到連線,阻塞在suspendResumeLock.acquire(),除非resume方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到訊號量
close
連線有借有還,連線檢測的task也是會關閉的。
我們看一下com.zaxxer.hikari.pool.ProxyConnection原始碼,
// **********************************************************************
// "Overridden" java.sql.Connection Methods
// **********************************************************************
/** {@inheritDoc} */
@Override
public final void close() throws SQLException
{
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();
try {
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();
}
delegate.clearWarnings();
}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
}
}
在connection的close的時候,delegate != ClosedConnection.CLOSED_CONNECTION時會呼叫leakTask.cancel();取消檢測連線洩露的task。
在closeStatements中也會關閉:
@SuppressWarnings("EmptyTryBlock")
private synchronized void closeStatements()
{
final int size = openStatements.size();
if (size > 0) {
for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
try (Statement ignored = openStatements.get(i)) {
// automatic resource cleanup
}
catch (SQLException e) {
LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
poolEntry.getPoolName(), delegate);
leakTask.cancel();
poolEntry.evict("(exception closing Statements during Connection.close())");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
}
openStatements.clear();
}
}
在checkException中也會關閉
final SQLException checkException(SQLException sqle)
{
SQLException nse = sqle;
for (int depth = 0; delegate != ClosedConnection.CLOSED_CONNECTION && nse != null && depth < 10; depth++) {
final String sqlState = nse.getSQLState();
if (sqlState != null && sqlState.startsWith("08") || ERROR_STATES.contains(sqlState) || ERROR_CODES.contains(nse.getErrorCode())) {
// broken connection
LOGGER.warn("{} - Connection {} marked as broken because of SQLSTATE({}), ErrorCode({})",
poolEntry.getPoolName(), delegate, sqlState, nse.getErrorCode(), nse);
leakTask.cancel();
poolEntry.evict("(connection is broken)");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
else {
nse = nse.getNextException();
}
}
return sqle;
}
在com.zaxxer.hikari.pool.HikariPool的evictConnection中,也會關閉任務
/**
* Evict a Connection from the pool.
*
* @param connection the Connection to evict (actually a {@link ProxyConnection})
*/
public void evictConnection(Connection connection)
{
ProxyConnection proxyConnection = (ProxyConnection) connection;
proxyConnection.cancelLeakTask();
try {
softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);
}
catch (SQLException e) {
// unreachable in HikariCP, but we're still forced to catch it
}
}
小結關閉任務如下圖所示:
測試模擬
我們可以根據本文對於leakDetectionThreshold的分析用測試包裡的com.zaxxer.hikari.pool.MiscTest程式碼進行適當引數調整模擬連線洩漏情況,測試程式碼如下:
/**
* @author Brett Wooldridge
*/
public class MiscTest
{
@Test
public void testLogWriter() throws SQLException
{
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(4);
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
PrintWriter writer = new PrintWriter(System.out);
ds.setLogWriter(writer);
assertSame(writer, ds.getLogWriter());
assertEquals("testLogWriter", config.getPoolName());
}
finally
{
setConfigUnitTest(false);
}
}
@Test
public void testInvalidIsolation()
{
try {
getTransactionIsolation("INVALID");
fail();
}
catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}
@Test
public void testCreateInstance()
{
try {
createInstance("invalid", null);
fail();
}
catch (RuntimeException e) {
assertTrue(e.getCause() instanceof ClassNotFoundException);
}
}
@Test
public void testLeakDetection() throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (PrintStream ps = new PrintStream(baos, true)) {
setSlf4jTargetStream(Class.forName("com.zaxxer.hikari.pool.ProxyLeakTask"), ps);
setConfigUnitTest(true);
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(4);
config.setThreadFactory(Executors.defaultThreadFactory());
config.setMetricRegistry(null);
config.setLeakDetectionThreshold(TimeUnit.SECONDS.toMillis(4));
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
try (HikariDataSource ds = new HikariDataSource(config)) {
setSlf4jLogLevel(HikariPool.class, Level.DEBUG);
getPool(ds).logPoolState();
try (Connection connection = ds.getConnection()) {
quietlySleep(SECONDS.toMillis(4));
connection.close();
quietlySleep(SECONDS.toMillis(1));
ps.close();
String s = new String(baos.toByteArray());
assertNotNull("Exception string was null", s);
assertTrue("Expected exception to contain 'Connection leak detection' but contains *" + s + "*", s.contains("Connection leak detection"));
}
}
finally
{
setConfigUnitTest(false);
setSlf4jLogLevel(HikariPool.class, Level.INFO);
}
}
}
}
當程式碼執行到了quietlySleep(SECONDS.toMillis(4));時直接按照預期拋異常Apparent connection leak detected。
緊接著在close的過程中執行到了delegate != ClosedConnection.CLOSED_CONNECTION來進行leakTask.cancel()
完整的測試輸出模擬過程如下所示:
Spark/Scala連線池洩漏問題排查
金融中心大資料決策資料組的同學找到反饋了一個問題:
我們在同一個jvm 需要連線多個資料庫時,發現總體上 從連線池borrow 的 connection 多於 歸還的,一段時間後 連線池就會報出
Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 – Connection is not available, request timed out after 30000ms的異常。
使用者使用的spark的場景有點特殊,單機上開的連結很小,但是有很多機器都會去連。使用者在一個jvm中就只會併發1個連結。
maximumPoolSize: 5
minimumIdle: 2
程式也會出現block的情況,發現是執行mysql時出現的,
mysql show processlist;發現大多停留在query end的情況,程式 thread dump 行程 持有monitor的執行緒。
DBA介入之後發現存在slow sql。
當然,這個問題出了是寫頻繁導致的,一次寫入的量有點大,每一個sql都巨大走的batch,寫入的 records 數在每秒 30-50條,一個record 有70多個欄位。一個解決方式是把 binlog 移到 ssd 盤;還有一個方式是innodb_flush_log_at_trx_commit把這個引數改成0了,估計可能會提高20%~30%。
修複瞭如上一些問題之後,又發現使用者反饋的問題,加了leakDetectionThreshold,得出的結論是存在連線洩漏(從池中借用後連線沒有關閉)。
針對這個問題,我們懷疑的連線池洩漏的點要麼在hikari中,要麼在spark/scala中。採用排除法使用了druid,依然存在這個問題;於是我們就去翻spark這塊的程式碼,仔細分析之後定位到了問題:
因為scala map懶載入,一開始mapPartitions都落在一個stage中,我們調整程式碼toList之後result.iterator就分在獨立的stage中,連線池洩漏問題就不再存在。
根本原因可以參見《Spark : How to use mapPartition and create/close connection per partition
》:
https://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition/36545821#36545821
一開始以為這是一個連線池問題,或者是spark問題,但是實際上透過leakDetectionThreshold的定位,我們得知實際上這是一個scala問題 :)
參考資料
https://segmentfault.com/a/1190000013092894
如果你對 Dubbo 感興趣,歡迎加入我的知識星球一起交流。
目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:
01. 除錯環境搭建
02. 專案結構一覽
03. 配置 Configuration
04. 核心流程一覽
05. 拓展機制 SPI
06. 執行緒池
07. 服務暴露 Export
08. 服務取用 Refer
09. 註冊中心 Registry
10. 動態編譯 Compile
11. 動態代理 Proxy
12. 服務呼叫 Invoke
13. 呼叫特性
14. 過濾器 Filter
15. NIO 伺服器
16. P2P 伺服器
17. HTTP 伺服器
18. 序列化 Serialization
19. 叢集容錯 Cluster
20. 優雅停機
21. 日誌適配
22. 狀態檢查
23. 監控中心 Monitor
24. 管理中心 Admin
25. 運維命令 QOS
26. 鏈路追蹤 Tracing
…
一共 60 篇++