摘自【工匠小豬豬的技術世界】
這是一個系列,有興趣的朋友可以持續關註
如果你有HikariCP使用上的問題,可以給我留言,我們一起溝通討論
希望大家可以提供我一些案例,我也希望可以支援你們做一些調優
概念
此屬性控制在記錄訊息之前連線可能離開池的時間量,單位毫秒,預設為0,表明可能存在連線洩漏。 如果大於0且不是單元測試,則進一步判斷:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),會被重置為0。即如果要生效則必須>0,而且不能小於2秒,而且當maxLifetime > 0時不能大於maxLifetime(預設值1800000毫秒=30分鐘)。
leakDetectionThreshold This property controls the amount of time that a connection can be out of the pool before a message is logged indicating a possible connection leak. A value of 0 means leak detection is disabled. Lowest acceptable value for enabling leak detection is 2000 (2 seconds). Default: 0
更多配置大綱詳見文章 【追光者系列】HikariCP預設配置
原始碼解析
我們首先來看一下leakDetectionThreshold用在了哪裡的綱要圖:
Write
還記得上一篇文章【追光者系列】HikariCP原始碼分析之從validationTimeout來講講Hikari 2.7.5版本的那些故事提到:我們可以看到在兩處看到validationTimeout的寫入,一處是PoolBase建構式,另一處是HouseKeeper執行緒。 leakDetectionThreshold的用法可以說是異曲同工,除了建構式之外,也用了HouseKeeper執行緒去處理。
HikariConfig
在com.zaxxer.hikari.HikariConfig中進行了leakDetectionThreshold初始化工作,
@Override
public void setLeakDetectionThreshold(long leakDetectionThresholdMs) {
this.leakDetectionThreshold = leakDetectionThresholdMs;
}```
validateNumerics方法中則是解釋了上文及官方檔案中該值validate的策略
if (leakDetectionThreshold > 0 && !unitTest) { if (leakDetectionThreshold < SECONDS.toMillis(2) || (leakDetectionThreshold > maxLifetime && maxLifetime > 0)) { LOGGER.warn("{} - leakDetectionThreshold is less than 2000ms or more than maxLifetime, disabling it.", poolName); leakDetectionThreshold = 0; } }```
該方法會被HikariConfig#validate所呼叫,而HikariConfig#validate會在HikariDataSource的specified configuration的建構式使用到
/**
* Construct a HikariDataSource with the specified configuration. The
* {@link HikariConfig} is copied and the pool is started by invoking this
* constructor.
*
* The {@link HikariConfig} can be modified without affecting the HikariDataSource
* and used to initialize another HikariDataSource instance.
*
* @param configuration a HikariConfig instance
*/
public HikariDataSource(HikariConfig configuration)
{
configuration.validate();
configuration.copyStateTo(this);
LOGGER.info("{} - Starting...", configuration.getPoolName());
pool = fastPathPool = new HikariPool(this);
LOGGER.info("{} - Start completed.", configuration.getPoolName());
this.seal();
}
也在每次getConnection的時候用到了,
// ***********************************************************************
// DataSource methods
// ***********************************************************************
/** {@inheritDoc} */
@Override
public Connection getConnection() throws SQLException {
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}
if (fastPathPool != null) {
return fastPathPool.getConnection();
}
// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}
return result.getConnection();
}
這裡要特別提一下一個很牛逼的Double-checkedlocking的實現,大家可以看一下這篇文章 https://en.wikipedia.org/wiki/Double-checkedlocking#UsageinJava
// Works with acquire/release semantics for volatile in Java 1.5 and later
// Broken under Java 1.4 and earlier semantics for volatile
class Foo {
private volatile Helper helper;
public Helper getHelper() {
Helper localRef = helper;
if (localRef == null) {
synchronized(this) {
localRef = helper;
if (localRef == null) {
helper = localRef = new Helper();
}
}
}
return localRef;
}
// other functions and members...
}
HouseKeeper
我們再來看一下com.zaxxer.hikari.pool.HikariPool這個程式碼,該執行緒嘗試在池中維護的最小空閑連線數,並不斷掃清的透過MBean調整的connectionTimeout和validationTimeout等值,leakDetectionThreshold這個值也是透過這個HouseKeeper的leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold())去管理的。
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
private final class HouseKeeper implements Runnable {
private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);
@Override
public void run()
{
try {
// refresh timeouts in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTask.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
fillPool();
return;
}
else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int removed = 0;
for (PoolEntry entry : notInUse) {
if (elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
if (++removed > config.getMinimumIdle()) {
break;
}
}
}
}
logPoolState(afterPrefix);
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
LOGGER.error("Unexpected exception in housekeeping task", e);
}
}
}
這裡補充說一下這個HouseKeeper,它是在com.zaxxer.hikari.pool.HikariPool的建構式中初始化的:this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
/**
* Create/initialize the Housekeeping service {@link ScheduledExecutorService}. If the user specified an Executor
* to be used in the {@link HikariConfig}, then we use that. If no Executor was specified (typical), then create
* an Executor and configure it.
*
* @return either the user specified {@link ScheduledExecutorService}, or the one we created
*/
private ScheduledExecutorService initializeHouseKeepingExecutorService() {
if (config.getScheduledExecutor() == null) {
final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true));
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(true);
return executor;
}
else {
return config.getScheduledExecutor();
}
}
這裡簡要說明一下,ScheduledThreadPoolExecutor是ThreadPoolExecutor類的子類,因為繼承了ThreadPoolExecutor類所有的特性。但是,Java推薦僅在開發定時任務程式時採用ScheduledThreadPoolExecutor類。 在呼叫shutdown()方法而仍有待處理的任務需要執行時,可以配置ScheduledThreadPoolExecutor的行為。預設的行為是不論執行器是否結束,待處理的任務仍將被執行。但是,透過呼叫ScheduledThreadPoolExecutor類的setExecuteExistingDelayedTasksAfterShutdownPolicy()方法則可以改變這個行為。傳遞false引數給這個方法,執行shutdown()方法之後,待處理的任務將不會被執行。 取消任務後,判斷是否需要從阻塞佇列中移除任務。其中removeOnCancel引數透過setRemoveOnCancelPolicy()設定。之所以要在取消任務後移除阻塞佇列中任務,是為了防止佇列中積壓大量已被取消的任務。 從這兩個引數配置大家可以瞭解到作者的對於HouseKeeper的配置初衷。
小結
Hikari透過建構式和HouseKeeper對於一些配置引數進行初始化及動態賦值,動態賦值依賴於HikariConfigMXbean以及使用任務排程執行緒池ScheduledThreadPoolExecutor來不斷掃清配置的。
我們僅僅以com.zaxxer.hikari.HikariConfig來做下小結,允許在執行時進行動態修改的主要有:
// Properties changeable at runtime through the HikariConfigMXBean
private volatile long connectionTimeout;
private volatile long validationTimeout;
private volatile long idleTimeout;
private volatile long leakDetectionThreshold;
private volatile long maxLifetime;
private volatile int maxPoolSize;
private volatile int minIdle;
private volatile String username;
private volatile String password;
不允許在執行時進行改變的主要有
// Properties NOT changeable at runtime
private long initializationFailTimeout;
private String catalog;
private String connectionInitSql;
private String connectionTestQuery;
private String dataSourceClassName;
private String dataSourceJndiName;
private String driverClassName;
private String jdbcUrl;
private String poolName;
private String schema;
private String transactionIsolationName;
private boolean isAutoCommit;
private boolean isReadOnly;
private boolean isIsolateInternalQueries;
private boolean isRegisterMbeans;
private boolean isAllowPoolSuspension;
private DataSource dataSource;
private Properties dataSourceProperties;
private ThreadFactory threadFactory;
private ScheduledExecutorService scheduledExecutor;
private MetricsTrackerFactory metricsTrackerFactory;
private Object metricRegistry;
private Object healthCheckRegistry;
private Properties healthCheckProperties;
Read
getConnection
在com.zaxxer.hikari.pool.HikariPool的核心方法getConnection傳回的時候呼叫了poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now) 註意,建立代理連線的時候關聯了ProxyLeakTask。 連線洩漏檢測的原理就是:連線有借有還,hikari是每借用一個connection則會建立一個延時的定時任務,在歸還或者出異常的或者使用者手動呼叫evictConnection的時候cancel掉這個task
/**
* Get a connection from the pool, or timeout after the specified number of milliseconds.
*
* @param hardTimeout the maximum time to wait for a connection from the pool
* @return a java.sql.Connection instance
* @throws SQLException thrown if a timeout occurs trying to obtain a connection
*/
public Connection getConnection(final long hardTimeout) throws SQLException {
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
leakTaskFactory、ProxyLeakTaskFactory、ProxyLeakTask
在HikariPool建構式裡,初始化了leakTaskFactory,以及houseKeepingExecutorService。
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS);
com.zaxxer.hikari.pool.ProxyLeakTaskFactory是作者慣用的設計,我們看一下原始碼:
/**
* A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks.
*
* @author Brett Wooldridge
* @author Andreas Brenk
*/
class ProxyLeakTaskFactory {
private ScheduledExecutorService executorService;
private long leakDetectionThreshold;
ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService)
{
this.executorService = executorService;
this.leakDetectionThreshold = leakDetectionThreshold;
}
ProxyLeakTask schedule(final PoolEntry poolEntry) {
return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry);
}
void updateLeakDetectionThreshold(final long leakDetectionThreshold) {
this.leakDetectionThreshold = leakDetectionThreshold;
}
private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) {
ProxyLeakTask task = new ProxyLeakTask(poolEntry);
task.schedule(executorService, leakDetectionThreshold);
return task;
}
}
如果leakDetectionThreshold=0,即禁用連線洩露檢測,schedule傳回的是ProxyLeakTask.NO_LEAK,否則則新建一個ProxyLeakTask,在leakDetectionThreshold時間後觸發
再看一下com.zaxxer.hikari.pool.ProxyLeakTask的原始碼
/**
* A Runnable that is scheduled in the future to report leaks. The ScheduledFuture is
* cancelled if the connection is closed before the leak time expires.
*
* @author Brett Wooldridge
*/
class ProxyLeakTask implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
static final ProxyLeakTask NO_LEAK;
private ScheduledFuture> scheduledFuture;
private String connectionName;
private Exception exception;
private String threadName;
private boolean isLeaked;
static
{
NO_LEAK = new ProxyLeakTask() {
@Override
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}
@Override
public void run() {}
@Override
public void cancel() {}
};
}
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
private ProxyLeakTask() {
}
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {
scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
public void run() {
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
void cancel() {
scheduledFuture.cancel(false);
if (isLeaked) {
LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
}
}
}
NO_LEAK類裡頭的方法都是空操作 一旦該task被觸發,則丟擲Exception("Apparent connection leak detected")
我們想起了什麼,是不是想起了【追光者系列】HikariCP原始碼分析之allowPoolSuspension那篇文章裡有著一摸一樣的設計?
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
isAllowPoolSuspension預設值是false的,建構式直接會建立SuspendResumeLock.FAUX_LOCK;只有isAllowPoolSuspension為true時,才會真正建立SuspendResumeLock。
com.zaxxer.hikari.util.SuspendResumeLock內部實現了一虛一實兩個java.util.concurrent.Semaphore
/**
* This class implements a lock that can be used to suspend and resume the pool. It
* also provides a faux implementation that is used when the feature is disabled that
* hopefully gets fully "optimized away" by the JIT.
*
* @author Brett Wooldridge
*/
public class SuspendResumeLock {
public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
@Override
public void acquire() {}
@Override
public void release() {}
@Override
public void suspend() {}
@Override
public void resume() {}
};
private static final int MAX_PERMITS = 10000;
private final Semaphore acquisitionSemaphore;
/**
* Default constructor
*/
public SuspendResumeLock() {
this(true);
}
private SuspendResumeLock(final boolean createSemaphore) {
acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
}
public void acquire() {
acquisitionSemaphore.acquireUninterruptibly();
}
public void release() {
acquisitionSemaphore.release();
}
public void suspend() {
acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
}
public void resume() {
acquisitionSemaphore.release(MAX_PERMITS);
}
}
由於Hikari的isAllowPoolSuspension預設值是false的,FAUXLOCK只是一個空方法,acquisitionSemaphore物件也是空的;如果isAllowPoolSuspension值調整為true,當收到MBean的suspend呼叫時將會一次性acquisitionSemaphore.acquireUninterruptibly從此訊號量獲取給定數目MAXPERMITS 10000的許可,在提供這些許可前一直將執行緒阻塞。之後HikariPool的getConnection方法獲取不到連線,阻塞在suspendResumeLock.acquire(),除非resume方法釋放給定數目MAX_PERMITS 10000的許可,將其傳回到訊號量
close
連線有借有還,連線檢測的task也是會關閉的。 我們看一下com.zaxxer.hikari.pool.ProxyConnection原始碼,
// **********************************************************************
// "Overridden" java.sql.Connection Methods
// **********************************************************************
/** {@inheritDoc} */
@Override
public final void close() throws SQLException {
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();
try {
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();
}
delegate.clearWarnings();
}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
}
}
在connection的close的時候,delegate != ClosedConnection.CLOSED_CONNECTION時會呼叫leakTask.cancel();取消檢測連線洩露的task。
在closeStatements中也會關閉:
@SuppressWarnings("EmptyTryBlock")
private synchronized void closeStatements() {
final int size = openStatements.size();
if (size > 0) {
for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
try (Statement ignored = openStatements.get(i)) {
// automatic resource cleanup
}
catch (SQLException e) {
LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
poolEntry.getPoolName(), delegate);
leakTask.cancel();
poolEntry.evict("(exception closing Statements during Connection.close())");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
}
openStatements.clear();
}
}
在checkException中也會關閉
final SQLException checkException(SQLException sqle) {
SQLException nse = sqle;
for (int depth = 0; delegate != ClosedConnection.CLOSED_CONNECTION && nse != null && depth < 10; depth++) {
final String sqlState = nse.getSQLState();
if (sqlState != null && sqlState.startsWith("08") || ERROR_STATES.contains(sqlState) || ERROR_CODES.contains(nse.getErrorCode())) {
// broken connection
LOGGER.warn("{} - Connection {} marked as broken because of SQLSTATE({}), ErrorCode({})",
poolEntry.getPoolName(), delegate, sqlState, nse.getErrorCode(), nse);
leakTask.cancel();
poolEntry.evict("(connection is broken)");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
else {
nse = nse.getNextException();
}
}
return sqle;
}
在com.zaxxer.hikari.pool.HikariPool的evictConnection中,也會關閉任務
/**
* Evict a Connection from the pool.
*
* @param connection the Connection to evict (actually a {@link ProxyConnection})
*/
public void evictConnection(Connection connection) {
ProxyConnection proxyConnection = (ProxyConnection) connection;
proxyConnection.cancelLeakTask();
try {
softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);
}
catch (SQLException e) {
// unreachable in HikariCP, but we're still forced to catch it
}
}
小結關閉任務如下圖所示:
測試模擬
我們可以根據本文對於leakDetectionThreshold的分析用測試包裡的com.zaxxer.hikari.pool.MiscTest程式碼進行適當引數調整模擬連線洩漏情況,測試程式碼如下:
/**
* @author Brett Wooldridge
*/
public class MiscTest {
@Test
public void testLogWriter() throws SQLException {
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(4);
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
setConfigUnitTest(true);
try (HikariDataSource ds = new HikariDataSource(config)) {
PrintWriter writer = new PrintWriter(System.out);
ds.setLogWriter(writer);
assertSame(writer, ds.getLogWriter());
assertEquals("testLogWriter", config.getPoolName());
}
finally
{
setConfigUnitTest(false);
}
}
@Test
public void testInvalidIsolation() {
try {
getTransactionIsolation("INVALID");
fail();
}
catch (Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}
@Test
public void testCreateInstance() {
try {
createInstance("invalid", null);
fail();
}
catch (RuntimeException e) {
assertTrue(e.getCause() instanceof ClassNotFoundException);
}
}
@Test
public void testLeakDetection() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (PrintStream ps = new PrintStream(baos, true)) {
setSlf4jTargetStream(Class.forName("com.zaxxer.hikari.pool.ProxyLeakTask"), ps);
setConfigUnitTest(true);
HikariConfig config = newHikariConfig();
config.setMinimumIdle(0);
config.setMaximumPoolSize(4);
config.setThreadFactory(Executors.defaultThreadFactory());
config.setMetricRegistry(null);
config.setLeakDetectionThreshold(TimeUnit.SECONDS.toMillis(4));
config.setDataSourceClassName("com.zaxxer.hikari.mocks.StubDataSource");
try (HikariDataSource ds = new HikariDataSource(config)) {
setSlf4jLogLevel(HikariPool.class, Level.DEBUG);
getPool(ds).logPoolState();
try (Connection connection = ds.getConnection()) {
quietlySleep(SECONDS.toMillis(4));
connection.close();
quietlySleep(SECONDS.toMillis(1));
ps.close();
String s = new String(baos.toByteArray());
assertNotNull("Exception string was null", s);
assertTrue("Expected exception to contain 'Connection leak detection' but contains *" + s + "*", s.contains("Connection leak detection"));
}
}
finally
{
setConfigUnitTest(false);
setSlf4jLogLevel(HikariPool.class, Level.INFO);
}
}
}
}
當程式碼執行到了quietlySleep(SECONDS.toMillis(4));時直接按照預期拋異常Apparent connection leak detected。
緊接著在close的過程中執行到了delegate != ClosedConnection.CLOSED_CONNECTION來進行leakTask.cancel()
完整的測試輸出模擬過程如下所示:
Spark/Scala連線池洩漏問題排查
金融中心大資料決策資料組的同學找到反饋了一個問題:
我們在同一個jvm 需要連線多個資料庫時,發現總體上 從連線池borrow 的 connection 多於 歸還的,一段時間後 連線池就會報出 Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms的異常。
使用者使用的spark的場景有點特殊,單機上開的連結很小,但是有很多機器都會去連。使用者在一個jvm中就只會併發1個連結。
maximumPoolSize: 5
minimumIdle: 2
程式也會出現block的情況,發現是執行mysql時出現的, mysql show processlist;發現大多停留在query end的情況,程式 thread dump 行程 持有monitor的執行緒。
DBA介入之後發現存在slow sql。
當然,這個問題出了是寫頻繁導致的,一次寫入的量有點大,每一個sql都巨大走的batch,寫入的 records 數在每秒 30-50條,一個record 有70多個欄位。一個解決方式是把 binlog 移到 ssd 盤;還有一個方式是innodbflushlogattrx_commit把這個引數改成0了,估計可能會提高20%~30%。
修複瞭如上一些問題之後,又發現使用者反饋的問題,加了leakDetectionThreshold,得出的結論是存在連線洩漏(從池中借用後連線沒有關閉)。
針對這個問題,我們懷疑的連線池洩漏的點要麼在hikari中,要麼在spark/scala中。採用排除法使用了druid,依然存在這個問題;於是我們就去翻spark這塊的程式碼,仔細分析之後定位到了問題:
因為scala map懶載入,一開始mapPartitions都落在一個stage中,我們調整程式碼toList之後result.iterator就分在獨立的stage中,連線池洩漏問題就不再存在。
根本原因可以參見《Spark : How to use mapPartition and create/close connection per partition 》: https://stackoverflow.com/questions/36545579/spark-how-to-use-mappartition-and-create-close-connection-per-partition/36545821#36545821
一開始以為這是一個連線池問題,或者是spark問題,但是實際上透過leakDetectionThreshold的定位,我們得知實際上這是一個scala問題 :)
參考資料
-
https://segmentfault.com/a/1190000013092894
END