摘自【工匠小豬豬的技術世界】 1.這是一個系列,有興趣的朋友可以持續關註 2.如果你有HikariCP使用上的問題,可以給我留言,我們一起溝通討論 3.希望大家可以提供我一些案例,我也希望可以支援你們做一些調優
概念
evict定義在com.zaxxer.hikari.pool.PoolEntry中,evict的漢語意思是驅逐、逐出,用來標記連線池中的連線不可用。
private volatile boolean evict;
boolean isMarkedEvicted() {
return evict;
}
void markEvicted() {
this.evict = true;
}
getConnection
在每次getConnection的時候,borrow連線(PoolEntry)的時候,如果是標記evict的,則會關閉連線,更新timeout的值,重新迴圈繼續獲取連線
/**
* Get a connection from the pool, or timeout after the specified number of milliseconds.
*
* @param hardTimeout the maximum time to wait for a connection from the pool
* @return a java.sql.Connection instance
* @throws SQLException thrown if a timeout occurs trying to obtain a connection
*/
public Connection getConnection(final long hardTimeout) throws SQLException {
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
如下我們聚焦一下原始碼,hardTimeout預設值是30000,這個值實際上就是connectionTimeout,建構式預設值是SECONDS.toMillis(30) = 30000,預設配置validate之後的值是30000,validate重置以後是如果小於250毫秒,則被重置回30秒。
connectionTimeout This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. Lowest acceptable connection timeout is 250 ms. Default: 30000 (30 seconds)
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
關閉連線這塊的原始碼如下,從註釋可以看到(閱讀hikari原始碼強烈建議看註釋),這是永久關閉真實(底層)連線(吃掉任何異常):
private static final String EVICTED_CONNECTION_MESSAGE = "(connection was evicted)";
private static final String DEAD_CONNECTION_MESSAGE = "(connection is dead)";
/**
* Permanently close the real (underlying) connection (eat any exception).
*
* @param poolEntry poolEntry having the connection to close
* @param closureReason reason to close
*/
void closeConnection(final PoolEntry poolEntry, final String closureReason) {
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
}
吃掉體現在quietlyCloseConnection,這是吃掉Throwable的
// ***********************************************************************
// JDBC methods
// ***********************************************************************
void quietlyCloseConnection(final Connection connection, final String closureReason) {
if (connection != null) {
try {
LOGGER.debug("{} - Closing connection {}: {}", poolName, connection, closureReason);
try {
setNetworkTimeout(connection, SECONDS.toMillis(15));
}
finally {
connection.close(); // continue with the close even if setNetworkTimeout() throws
}
}
catch (Throwable e) {
LOGGER.debug("{} - Closing connection {} failed", poolName, connection, e);
}
}
}
createPoolEntry
這段程式碼強烈建議看一下註釋,maxLifetime預設是1800000=30分鐘,就是讓每個連線的最大存活時間錯開一點,防止同時過期,加一點點隨機因素,防止一件事情大量同時發生(C大語錄)。
// ***********************************************************************
// Private methods
// ***********************************************************************
/**
* Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from
* the maxLifetime time to ensure there is no massive die-off of Connections in the pool.
*/
private PoolEntry createPoolEntry() {
try {
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
// variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
return poolEntry;
}
catch (Exception e) {
if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently
LOGGER.debug("{} - Cannot acquire connection from data source", poolName, (e instanceof ConnectionSetupException ? e.getCause() : e));
}
return null;
}
}
如果maxLifetime大於10000就是大於10秒鐘,就走這個策略,用maxLifetime的2.5%的時間和0之間的隨機數來隨機設定一個variance,在maxLifetime – variance之後觸發evict。 在建立poolEntry的時候,註冊一個延時任務,在連線存活將要到達maxLifetime之前觸發evit,用來防止出現大面積的connection因maxLifetime同一時刻失效。 標記為evict只是表示連線池中的該連線不可用,但還在連線池當中,還會被borrow出來,只是getConnection的時候判斷了,如果是isMarkedEvicted,則會從連線池中移除該連線,然後close掉。
evict Related
evictConnection
可以主動呼叫evictConnection,這裡也是判斷是不是使用者自己呼叫的或者從connectionBag中標記不可borrow成功,則關閉連線
/**
* Evict a Connection from the pool.
*
* @param connection the Connection to evict (actually a {@link ProxyConnection})
*/
public void evictConnection(Connection connection) {
ProxyConnection proxyConnection = (ProxyConnection) connection;
proxyConnection.cancelLeakTask();
try {
softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */);
}
catch (SQLException e) {
// unreachable in HikariCP, but we're still forced to catch it
}
}
softEvictConnection
/**
* "Soft" evict a Connection (/PoolEntry) from the pool. If this method is being called by the user directly
* through {@link com.zaxxer.hikari.HikariDataSource#evictConnection(Connection)} then {@code owner} is {@code true}.
*
* If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link ConcurrentBag}),
* then we can close the connection immediately. Otherwise, we leave it "marked" for eviction so that it is evicted
* the next time someone tries to acquire it from the pool.
*
* @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool
* @param reason the reason that the connection is being evicted
* @param owner true if the caller is the owner of the connection, false otherwise
* @return true if the connection was evicted (closed), false if it was merely marked for eviction
*/
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {
poolEntry.markEvicted();
if (owner || connectionBag.reserve(poolEntry)) {
closeConnection(poolEntry, reason);
return true;
}
return false;
}
com.zaxxer.hikari.util.ConcurrentBag
/**
* The method is used to make an item in the bag "unavailable" for
* borrowing. It is primarily used when wanting to operate on items
* returned by the
values(int)``` method. Items that are
* reserved can be removed from the bag via
remove(T)```
* without the need to unreserve them. Items that are not removed
* from the bag can be make available for borrowing again by calling
* the
unreserve(T)``` method.
*
* @param bagEntry the item to reserve
* @return true if the item was able to be reserved, false otherwise
*/
public boolean reserve(final T bagEntry) {
return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
softEvictConnections
HikariPool中還提供了HikariPoolMXBean的softEvictConnections實現,實際上是呼叫softEvictConnection,owner指定false( not owner )
public void softEvictConnections() {
connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */));
}
Mbean的softEvictConnections方法真正執行的是com.zaxxer.hikari.pool.HikariPool中softEvictConnections方法,這是一種“軟”驅逐池中連線的方法,如果呼叫方是owner身份,或者連線處於空閑狀態,可以立即關閉連線。否則,我們將其“標記”為驅逐,以便下次有人試圖從池中獲取它時將其逐出。
public void softEvictConnections() {
connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */));
}
softEvictConnection
/**
* "Soft" evict a Connection (/PoolEntry) from the pool. If this method is being called by the user directly
* through {@link com.zaxxer.hikari.HikariDataSource#evictConnection(Connection)} then {@code owner} is {@code true}.
*
* If the caller is the owner, or if the Connection is idle (i.e. can be "reserved" in the {@link ConcurrentBag}),
* then we can close the connection immediately. Otherwise, we leave it "marked" for eviction so that it is evicted
* the next time someone tries to acquire it from the pool.
*
* @param poolEntry the PoolEntry (/Connection) to "soft" evict from the pool
* @param reason the reason that the connection is being evicted
* @param owner true if the caller is the owner of the connection, false otherwise
* @return true if the connection was evicted (closed), false if it was merely marked for eviction
*/
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {
poolEntry.markEvicted();
if (owner || connectionBag.reserve(poolEntry)) {
closeConnection(poolEntry, reason);
return true;
}
return false;
}
執行此方法時我們的owner預設傳false(not owner),呼叫com.zaxxer.hikari.util.ConcurrentBag的reserve對方進行保留
/**
* The method is used to make an item in the bag "unavailable" for
* borrowing. It is primarily used when wanting to operate on items
* returned by the
values(int)``` method. Items that are
* reserved can be removed from the bag via
remove(T)```
* without the need to unreserve them. Items that are not removed
* from the bag can be make available for borrowing again by calling
* the
unreserve(T)``` method.
*
* @param bagEntry the item to reserve
* @return true if the item was able to be reserved, false otherwise
*/
public boolean reserve(final T bagEntry) {
return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
除了 HikariPoolMXBean的呼叫,softEvictConnections在housekeeper中也有使用
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
private final class HouseKeeper implements Runnable {
private volatile long previous = plusMillis(currentTime(), -HOUSEKEEPING_PERIOD_MS);
@Override
public void run()
{
try {
// refresh timeouts in case they changed via MBean
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
final long idleTimeout = config.getIdleTimeout();
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
}
else if (now > plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2)) {
// No point evicting for forward clock motion, this merely accelerates connection retirement anyway
LOGGER.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now));
}
previous = now;
String afterPrefix = "Pool ";
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
logPoolState(afterPrefix);
fillPool(); // Try to maintain minimum connections
}
catch (Exception e) {
LOGGER.error("Unexpected exception in housekeeping task", e);
}
}
}
聚焦一下,這段程式碼也是檢測時鐘回撥,如果時鐘在規定範圍外回撥了,就驅除連線,並重置時間。
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, HOUSEKEEPING_PERIOD_MS)) {
LOGGER.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.",
poolName, elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
}
/**
* Return the specified opaque time-stamp plus the specified number of milliseconds.
*
* @param time an opaque time-stamp
* @param millis milliseconds to add
* @return a new opaque time-stamp
*/
static long plusMillis(long time, long millis) {
return CLOCK.plusMillis0(time, millis);
}
說到時鐘回撥,是不是想起了snowflake裡的時鐘回撥的處理?讓我們一起溫習一下!
/**
* 自生成Id生成器.
*
*
* 長度為64bit,從高位到低位依次為
*
*
*
* 1bit 符號位
* 41bits 時間偏移量從2016年11月1日零點到現在的毫秒數
* 10bits 工作行程Id
* 12bits 同一個毫秒內的自增量
*
*
*
* 工作行程Id獲取優先順序: 系統變數{@code sjdbc.self.id.generator.worker.id} 大於 環境變數{@code SJDBC_SELF_ID_GENERATOR_WORKER_ID}
* ,另外可以呼叫@{@code CommonSelfIdGenerator.setWorkerId}進行設定
*
*
* @author gaohongtao
*/
@Getter
@Slf4j
public class CommonSelfIdGenerator implements IdGenerator {
public static final long SJDBC_EPOCH;//時間偏移量,從2016年11月1日零點開始
private static final long SEQUENCE_BITS = 12L;//自增量佔用位元
private static final long WORKER_ID_BITS = 10L;//工作行程ID位元
private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;//自增量掩碼(最大值)
private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;//工作行程ID左移位元數(位數)
private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;//時間戳左移位元數(位數)
private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS;//工作行程ID最大值
@Setter
private static AbstractClock clock = AbstractClock.systemClock();
@Getter
private static long workerId;//工作行程ID
static {
Calendar calendar = Calendar.getInstance();
calendar.set(2016, Calendar.NOVEMBER, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
SJDBC_EPOCH = calendar.getTimeInMillis();
initWorkerId();
}
private long sequence;//最後自增量
private long lastTime;//最後生成編號時間戳,單位:毫秒
static void initWorkerId() {
String workerId = System.getProperty("sjdbc.self.id.generator.worker.id");
if (!Strings.isNullOrEmpty(workerId)) {
setWorkerId(Long.valueOf(workerId));
return;
}
workerId = System.getenv("SJDBC_SELF_ID_GENERATOR_WORKER_ID");
if (Strings.isNullOrEmpty(workerId)) {
return;
}
setWorkerId(Long.valueOf(workerId));
}
/**
* 設定工作行程Id.
*
* @param workerId 工作行程Id
*/
public static void setWorkerId(final Long workerId) {
Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);
CommonSelfIdGenerator.workerId = workerId;
}
/**
* 生成Id.
*
* @return 傳回@{@link Long}型別的Id
*/
@Override
public synchronized Number generateId() {
//保證當前時間大於最後時間。時間回退會導致產生重覆id
long time = clock.millis();
Preconditions.checkState(lastTime <= time, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, time);
// 獲取序列號
if (lastTime == time) {
if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
time = waitUntilNextTime(time);
}
} else {
sequence = 0;
}
// 設定最後時間戳
lastTime = time;
if (log.isDebugEnabled()) {
log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
}
// 生成編號
return ((time - SJDBC_EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
//不停獲得時間,直到大於最後時間
private long waitUntilNextTime(final long lastTime) {
long time = clock.millis();
while (time <= lastTime) {
time = clock.millis();
}
return time;
}
}
透過這段程式碼可以看到噹噹的時鐘回撥在單機上是做了處理的了,不但會丟擲Clock is moving backwards balabalabala的IllegalStateException,而且也做了waitUntilNextTime一直等待的處理
除了housekeeper,在shutdown中也做了處理
/**
* Shutdown the pool, closing all idle connections and aborting or closing
* active connections.
*
* @throws InterruptedException thrown if the thread is interrupted during shutdown
*/
public synchronized void shutdown() throws InterruptedException {
try {
poolState = POOL_SHUTDOWN;
if (addConnectionExecutor == null) { // pool never started
return;
}
logPoolState("Before shutdown ");
if (houseKeeperTask != null) {
houseKeeperTask.cancel(false);
houseKeeperTask = null;
}
softEvictConnections();
addConnectionExecutor.shutdown();
addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS);
destroyHouseKeepingExecutorService();
connectionBag.close();
final ExecutorService assassinExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection assassinator",
config.getThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
try {
final long start = currentTime();
do {
abortActiveConnections(assassinExecutor);
softEvictConnections();
} while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
}
finally {
assassinExecutor.shutdown();
assassinExecutor.awaitTermination(10L, SECONDS);
}
shutdownNetworkTimeoutExecutor();
closeConnectionExecutor.shutdown();
closeConnectionExecutor.awaitTermination(10L, SECONDS);
}
finally {
logPoolState("After shutdown ");
unregisterMBeans();
metricsTracker.close();
}
}
ConcurrentBag
說到ConcurrentBag這個不得不提的類,我這裡取用一下文章做一下簡要介紹,本系列後面會專題系統分析: http://www.cnblogs.com/taisenki/p/7699667.html HikariCP連線池是基於自主實現的ConcurrentBag完成的資料連線的多執行緒共享互動,是HikariCP連線管理快速的其中一個關鍵點。 ConcurrentBag是一個專門的併發包裹,在連線池(多執行緒資料互動)的實現上具有比LinkedBlockingQueue和LinkedTransferQueue更優越的效能。 ConcurrentBag透過拆分 CopyOnWriteArrayList、ThreadLocal和SynchronousQueue 進行併發資料互動。
-
CopyOnWriteArrayList:負責存放ConcurrentBag中全部用於出借的資源
-
ThreadLocal:用於加速執行緒本地化資源訪問
-
SynchronousQueue:用於存在資源等待執行緒時的第一手資源交接
ConcurrentBag中全部的資源均只能透過add方法進行新增,只能透過remove方法進行移出。
public void add(final T bagEntry) {
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry); //新新增的資源優先放入CopyOnWriteArrayList
// 當有等待資源的執行緒時,將資源交到某個等待執行緒後才傳回(SynchronousQueue)
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}
}
public boolean remove(final T bagEntry) {
// 如果資源正在使用且無法進行狀態切換,則傳回失敗
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
final boolean removed = sharedList.remove(bagEntry); // 從CopyOnWriteArrayList中移出
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
ConcurrentBag中透過borrow方法進行資料資源借用,透過requite方法進行資源回收,註意其中borrow方法只提供物件取用,不移除物件,因此使用時透過borrow取出的物件必須透過requite方法進行放回,否則容易導致記憶體洩露!
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
// 優先檢視有沒有可用的本地化的資源
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
final int waiting = waiters.incrementAndGet();
try {
// 當無可用本地化資源時,遍歷全部資源,檢視是否存在可用資源
// 因此被一個執行緒本地化的資源也可能被另一個執行緒“搶走”
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
if (waiting > 1) {
// 因為可能“搶走”了其他執行緒的資源,因此提醒包裹進行資源新增
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
// 當現有全部資源全部在使用中,等待一個被釋放的資源或者一個新資源
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
public void requite(final T bagEntry) {
// 將狀態轉為未在使用
bagEntry.setState(STATE_NOT_IN_USE);
// 判斷是否存在等待執行緒,若存在,則直接轉手資源
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
yield();
}
}
// 否則,進行資源本地化
final List<Object> threadLocalList = threadList.get();
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
上述程式碼中的 weakThreadLocals 是用來判斷是否使用弱取用,透過下述方法初始化:
private boolean useWeakThreadLocals()
{
try {
// 人工指定是否使用弱取用,但是官方不推薦進行自主設定。
if (System.getProperty("com.dareway.concurrent.useWeakReferences") != null) {
return Boolean.getBoolean("com.dareway.concurrent.useWeakReferences");
}
// 預設透過判斷初始化的ClassLoader是否是系統的ClassLoader來確定
return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
}
catch (SecurityException se) {
return true;
}
}
Hikari物理連線取用生命週期
上面提到了很多概念,比如HikariDataSource、HikariPool、ConcurrentBag、ProxyFactory、PoolEntry等等,那麼這裡的關係是什麼呢?
這裡推薦一下這篇文章 http://www.cnblogs.com/taisenki/p/7717912.html ,我取用一下部分內容:
HikariCP中的連線取用流程如下:
HikariPool負責對資源連線進行管理,而ConcurrentBag則是作為物理連線的共享資源站,PoolEntry則是對物理連線的1-1封裝。
PoolEntry透過connectionBag的borrow方法從bag中取出,,之後透過PoolEntry.createProxyConnection呼叫工廠類生成HikariProxyConnection傳回。
/**
* Entry used in the ConcurrentBag to track Connection instances.
*
* @author Brett Wooldridge
*/
final class PoolEntry implements IConcurrentBagEntry {
private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class);
private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
Connection connection;
long lastAccessed;
long lastBorrowed;
@SuppressWarnings("FieldCanBeLocal")
private volatile int state = 0;
private volatile boolean evict;
private volatile ScheduledFuture> endOfLife;
private final FastList<Statement> openStatements;
private final HikariPool hikariPool;
private final boolean isReadOnly;
private final boolean isAutoCommit;
static
{
stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PoolEntry.class, "state");
}
PoolEntry(final Connection connection, final PoolBase pool, final boolean isReadOnly, final boolean isAutoCommit)
{
this.connection = connection;
this.hikariPool = (HikariPool) pool;
this.isReadOnly = isReadOnly;
this.isAutoCommit = isAutoCommit;
this.lastAccessed = currentTime();
this.openStatements = new FastList<>(Statement.class, 16);
}
/**
* Release this entry back to the pool.
*
* @param lastAccessed last access time-stamp
*/
void recycle(final long lastAccessed) {
if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
/**
* Set the end of life {@link ScheduledFuture}.
*
* @param endOfLife this PoolEntry/Connection's end of life {@link ScheduledFuture}
*/
void setFutureEol(final ScheduledFuture> endOfLife) {
this.endOfLife = endOfLife;
}
Connection createProxyConnection(final ProxyLeakTask leakTask, final long now) {
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
}
void resetConnectionState(final ProxyConnection proxyConnection, final int dirtyBits) throws SQLException {
hikariPool.resetConnectionState(connection, proxyConnection, dirtyBits);
}
String getPoolName() {
return hikariPool.toString();
}
boolean isMarkedEvicted() {
return evict;
}
void markEvicted() {
this.evict = true;
}
void evict(final String closureReason) {
hikariPool.closeConnection(this, closureReason);
}
/** Returns millis since lastBorrowed */
long getMillisSinceBorrowed() {
return elapsedMillis(lastBorrowed);
}
/** {@inheritDoc} */
@Override
public String toString() {
final long now = currentTime();
return connection
+ ", accessed " + elapsedDisplayString(lastAccessed, now) + " ago, "
+ stateToString();
}
// ***********************************************************************
// IConcurrentBagEntry methods
// ***********************************************************************
/** {@inheritDoc} */
@Override
public int getState() {
return stateUpdater.get(this);
}
/** {@inheritDoc} */
@Override
public boolean compareAndSet(int expect, int update) {
return stateUpdater.compareAndSet(this, expect, update);
}
/** {@inheritDoc} */
@Override
public void setState(int update) {
stateUpdater.set(this, update);
}
Connection close() {
ScheduledFuture> eol = endOfLife;
if (eol != null && !eol.isDone() && !eol.cancel(false)) {
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}
Connection con = connection;
connection = null;
endOfLife = null;
return con;
}
private String stateToString() {
switch (state) {
case STATE_IN_USE:
return "IN_USE";
case STATE_NOT_IN_USE:
return "NOT_IN_USE";
case STATE_REMOVED:
return "REMOVED";
case STATE_RESERVED:
return "RESERVED";
default:
return "Invalid";
}
}
}
參考資料
-
https://segmentfault.com/a/1190000013118843
-
http://www.cnblogs.com/taisenki/p/7717912.html
END