(點選上方公眾號,可快速關註)
來源:JavaDoop,
javadoop.com/post/hashmap
今天發一篇”水文”,可能很多讀者都會表示不理解,不過我想把它作為併發序列文章中不可缺少的一塊來介紹。本來以為花不了多少時間的,不過最終還是投入了挺多時間來完成這篇文章的。
網上關於 HashMap 和 ConcurrentHashMap 的文章確實不少,不過缺斤少兩的文章比較多,所以才想自己也寫一篇,把細節說清楚說透,尤其像 Java8 中的 ConcurrentHashMap,大部分文章都說不清楚。終歸是希望能降低大家學習的成本,不希望大家到處找各種不是很靠譜的文章,看完一篇又一篇,可是還是模模糊糊。
閱讀建議:四節基本上可以進行獨立閱讀,建議初學者可按照 Java7 HashMap -> Java7 ConcurrentHashMap -> Java8 HashMap -> Java8 ConcurrentHashMap 順序進行閱讀,可適當降低閱讀門檻。
閱讀前提:本文分析的是原始碼,所以至少讀者要熟悉它們的介面使用,同時,對於併發,讀者至少要知道 CAS、ReentrantLock、UNSAFE 操作這幾個基本的知識,文中不會對這些知識進行介紹。Java8 用到了紅黑樹,不過本文不會進行展開,感興趣的讀者請自行查詢相關資料。
Java7 HashMap
HashMap 是最簡單的,一來我們非常熟悉,二來就是它不支援併發操作,所以原始碼也非常簡單。
首先,我們用下麵這張圖來介紹 HashMap 的結構。
這個僅僅是示意圖,因為沒有考慮到陣列要擴容的情況,具體的後面再說。
大方向上,HashMap 裡面是一個陣列,然後陣列中每個元素是一個單向連結串列。
上圖中,每個綠色的物體是巢狀類 Entry 的實體,Entry 包含四個屬性:key, value, hash 值和用於單向連結串列的 next。
capacity:當前陣列容量,始終保持 2^n,可以擴容,擴容後陣列大小為當前的 2 倍。
loadFactor:負載因子,預設為 0.75。
threshold:擴容的閾值,等於 capacity * loadFactor
put 過程分析
還是比較簡單的,跟著程式碼走一遍吧。
public V put(K key, V value) {
// 當插入第一個元素的時候,需要先初始化陣列大小
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
// 如果 key 為 null,感興趣的可以往裡看,最終會將這個 entry 放到 table[0] 中
if (key == null)
return putForNullKey(value);
// 1. 求 key 的 hash 值
int hash = hash(key);
// 2. 找到對應的陣列下標
int i = indexFor(hash, table.length);
// 3. 遍歷一下對應下標處的連結串列,看是否有重覆的 key 已經存在,
// 如果有,直接改寫,put 方法傳回舊值就結束了
for (Entry
e = table[i]; e != null; e = e.next) { Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
// 4. 不存在重覆的 key,將此 entry 新增到連結串列中,細節後面說
addEntry(hash, key, value, i);
return null;
}
陣列初始化
在第一個元素插入 HashMap 的時候做一次陣列的初始化,就是先確定初始的陣列大小,並計算陣列擴容的閾值。
private void inflateTable(int toSize) {
// 保證陣列大小一定是 2 的 n 次方。
// 比如這樣初始化:new HashMap(20),那麼處理成初始陣列大小是 32
int capacity = roundUpToPowerOf2(toSize);
// 計算擴容閾值:capacity * loadFactor
threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1);
// 算是初始化陣列吧
table = new Entry[capacity];
initHashSeedAsNeeded(capacity); //ignore
}
這裡有一個將陣列大小保持為 2 的 n 次方的做法,Java7 和 Java8 的 HashMap 和 ConcurrentHashMap 都有相應的要求,只不過實現的程式碼稍微有些不同,後面再看到的時候就知道了。
計算具體陣列位置
這個簡單,我們自己也能 YY 一個:使用 key 的 hash 值對陣列長度進行取模就可以了。
static int indexFor(int hash, int length) {
// assert Integer.bitCount(length) == 1 : “length must be a non-zero power of 2”;
return hash & (length-1);
}
這個方法很簡單,簡單說就是取 hash 值的低 n 位。如在陣列長度為 32 的時候,其實取的就是 key 的 hash 值的低 5 位,作為它在陣列中的下標位置。
新增節點到連結串列中
找到陣列下標後,會先進行 key 判重,如果沒有重覆,就準備將新值放入到連結串列的表頭。
void addEntry(int hash, K key, V value, int bucketIndex) {
// 如果當前 HashMap 大小已經達到了閾值,並且新值要插入的陣列位置已經有元素了,那麼要擴容
if ((size >= threshold) && (null != table[bucketIndex])) {
// 擴容,後面會介紹一下
resize(2 * table.length);
// 擴容以後,重新計算 hash 值
hash = (null != key) ? hash(key) : 0;
// 重新計算擴容後的新的下標
bucketIndex = indexFor(hash, table.length);
}
// 往下看
createEntry(hash, key, value, bucketIndex);
}
// 這個很簡單,其實就是將新值放到連結串列的表頭,然後 size++
void createEntry(int hash, K key, V value, int bucketIndex) {
Entry
e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e);
size++;
}
這個方法的主要邏輯就是先判斷是否需要擴容,需要的話先擴容,然後再將這個新的資料插入到擴容後的陣列的相應位置處的連結串列的表頭。
陣列擴容
前面我們看到,在插入新值的時候,如果當前的 size 已經達到了閾值,並且要插入的陣列位置上已經有元素,那麼就會觸發擴容,擴容後,陣列大小為原來的 2 倍。
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
// 新的陣列
Entry[] newTable = new Entry[newCapacity];
// 將原來陣列中的值遷移到新的更大的陣列中
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
擴容就是用一個新的大陣列替換原來的小陣列,並將原來陣列中的值遷移到新的陣列中。
由於是雙倍擴容,遷移過程中,會將原來 table[i] 中的連結串列的所有節點,分拆到新的陣列的 newTable[i] 和 newTable[i + oldLength] 位置上。如原來陣列長度是 16,那麼擴容後,原來 table[0] 處的連結串列中的所有元素會被分配到新陣列中 newTable[0] 和 newTable[16] 這兩個位置。程式碼比較簡單,這裡就不展開了。
get 過程分析
相對於 put 過程,get 過程是非常簡單的。
-
根據 key 計算 hash 值。
-
找到相應的陣列下標:hash & (length – 1)。
-
遍歷該陣列位置處的連結串列,直到找到相等(==或equals)的 key。
public V get(Object key) {
// 之前說過,key 為 null 的話,會被放到 table[0],所以只要遍歷下 table[0] 處的連結串列就可以了
if (key == null)
return getForNullKey();
//
Entry
entry = getEntry(key);
return null == entry ? null : entry.getValue();
}
getEntry(key):
final Entry
getEntry(Object key) { if (size == 0) {
return null;
}
int hash = (key == null) ? 0 : hash(key);
// 確定陣列下標,然後從頭開始遍歷連結串列,直到找到為止
for (Entry
e = table[indexFor(hash, table.length)]; e != null;
e = e.next) {
Object k;
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
}
return null;
}
Java7 ConcurrentHashMap
ConcurrentHashMap 和 HashMap 思路是差不多的,但是因為它支援併發操作,所以要複雜一些。
整個 ConcurrentHashMap 由一個個 Segment 組成,Segment 代表”部分“或”一段“的意思,所以很多地方都會將其描述為分段鎖。註意,行文中,我很多地方用了“槽”來代表一個 segment。
簡單理解就是,ConcurrentHashMap 是一個 Segment 陣列,Segment 透過繼承 ReentrantLock 來進行加鎖,所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是執行緒安全的,也就實現了全域性的執行緒安全。
concurrencyLevel:並行級別、併發數、Segment 數,怎麼翻譯不重要,理解它。預設是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支援 16 個執行緒併發寫,只要它們的操作分別分佈在不同的 Segment 上。這個值可以在初始化的時候設定為其他值,但是一旦初始化以後,它是不可以擴容的。
再具體到每個 Segment 內部,其實每個 Segment 很像之前介紹的 HashMap,不過它要保證執行緒安全,所以處理起來要麻煩些。
初始化
initialCapacity:初始容量,這個值指的是整個 ConcurrentHashMap 的初始容量,實際操作的時候需要平均分給每個 Segment。
loadFactor:負載因子,之前我們說了,Segment 陣列不可以擴容,所以這個負載因子是給每個 Segment 內部使用的。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
// 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 我們這裡先不要那麼燒腦,用預設值,concurrencyLevel 為 16,sshift 為 4
// 那麼計算出 segmentShift 為 28,segmentMask 為 15,後面會用到這兩個值
this.segmentShift = 32 – sshift;
this.segmentMask = ssize – 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// initialCapacity 是設定整個 map 初始的大小,
// 這裡根據 initialCapacity 計算 Segment 陣列中每個位置可以分到的大小
// 如 initialCapacity 為 64,那麼每個 Segment 或稱之為”槽”可以分到 4 個
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// 預設 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上,
// 插入一個元素不至於擴容,插入第二個的時候才會擴容
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 建立 Segment 陣列,
// 並建立陣列的第一個元素 segment[0]
Segment
s0 = new Segment
(loadFactor, (int)(cap * loadFactor), (HashEntry
[])new HashEntry[cap]); Segment
[] ss = (Segment [])new Segment[ssize]; // 往陣列寫入 segment[0]
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
初始化完成,我們得到了一個 Segment 陣列。
我們就當是用 new ConcurrentHashMap() 無參建構式進行初始化的,那麼初始化完成後:
-
Segment 陣列長度為 16,不可以擴容
-
Segment[i] 的預設大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
-
這裡初始化了 segment[0],其他位置還是 null,至於為什麼要初始化 segment[0],後面的程式碼會介紹
-
當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
put 過程分析
我們先看 put 的主流程,對於其中的一些關鍵細節操作,後面會進行詳細介紹。
public V put(K key, V value) {
Segment
s; if (value == null)
throw new NullPointerException();
// 1. 計算 key 的 hash 值
int hash = hash(key);
// 2. 根據 hash 值找到 Segment 陣列中的位置 j
// hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位,
// 然後和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最後 4 位,也就是槽的陣列下標
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null,
// ensureSegment(j) 對 segment[j] 進行初始化
if ((s = (Segment
)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
第一層皮很簡單,根據 hash 值很快就能找到相應的 Segment,之後就是 Segment 內部的 put 操作了。
Segment 內部是由 陣列+連結串列 組成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往該 segment 寫入前,需要先獲取該 segment 的獨佔鎖
// 先看主流程,後面還會具體介紹這部分內容
HashEntry
node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 這個是 segment 內部的陣列
HashEntry
[] tab = table; // 再利用 hash 值,求應該放置的陣列下標
int index = (tab.length – 1) & hash;
// first 是陣列該位置處的連結串列的表頭
HashEntry
first = entryAt(tab, index);
// 下麵這串 for 迴圈雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個連結串列這兩種情況
for (HashEntry
e = first;;) { if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 改寫舊值
e.value = value;
++modCount;
}
break;
}
// 繼續順著連結串列走
e = e.next;
}
else {
// node 到底是不是 null,這個要看獲取鎖的過程,不過和這裡都沒有關係。
// 如果不為 null,那就直接將它設定為鏈表表頭;如果是null,初始化並設定為鏈表表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry
(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個 segment 需要擴容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴容後面也會具體分析
else
// 沒有達到閾值,將 node 放到陣列 tab 的 index 位置,
// 其實就是將新的節點設定成原連結串列的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
}
整體流程還是比較簡單的,由於有獨佔鎖的保護,所以 segment 內部的操作並不複雜。至於這裡面的併發問題,我們稍後再進行介紹。
到這裡 put 操作就結束了,接下來,我們說一說其中幾步關鍵的操作。
初始化槽: ensureSegment
ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其他槽來說,在插入第一個值的時候進行初始化。
這裡需要考慮併發,因為很可能會有多個執行緒同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
private Segment
ensureSegment(int k) { final Segment
[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset
Segment
seg; if ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這裡看到為什麼之前要初始化 segment[0] 了,
// 使用當前 segment[0] 處的陣列長度和負載因子來初始化 segment[k]
// 為什麼要用“當前”,因為 segment[0] 可能早就擴容過了
Segment
proto = ss[0]; int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
// 初始化 segment[k] 內部的陣列
HashEntry
[] tab = (HashEntry [])new HashEntry[cap]; if ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次檢查一遍該槽是否被其他執行緒初始化了。
Segment
s = new Segment (lf, threshold, tab); // 使用 while 迴圈,內部用 CAS,當前執行緒成功設值或其他執行緒成功設值後,退出
while ((seg = (Segment
)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
總的來說,ensureSegment(int k) 比較簡單,對於併發操作使用 CAS 進行控制。
我沒搞懂這裡為什麼要搞一個 while 迴圈,CAS 失敗不就代表有其他執行緒成功了嗎,為什麼要再進行判斷?
獲取寫入鎖: scanAndLockForPut
前面我們看到,在往某個 segment 中 put 的時候,首先會呼叫 node = tryLock() ? null : scanAndLockForPut(key, hash, value),也就是說先進行一次 tryLock() 快速獲取該 segment 的獨佔鎖,如果失敗,那麼進入到 scanAndLockForPut 這個方法來獲取鎖。
下麵我們來具體分析這個方法中是怎麼控制加鎖的。
private HashEntry
scanAndLockForPut(K key, int hash, V value) { HashEntry
first = entryForHash(this, hash); HashEntry
e = first; HashEntry
node = null; int retries = -1; // negative while locating node
// 迴圈獲取鎖
while (!tryLock()) {
HashEntry
f; // to recheck first below if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 進到這裡說明陣列該位置的連結串列是空的,沒有任何元素
// 當然,進到這裡的另一個原因是 tryLock() 失敗,所以該槽存在併發,不一定是該位置
node = new HashEntry
(hash, key, value, null); retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 順著連結串列往下走
e = e.next;
}
// 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那麼不搶了,進入到阻塞佇列等待鎖
// lock() 是阻塞方法,直到獲取鎖後傳回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 這個時候是有大問題了,那就是有新的元素進到了連結串列,成為了新的表頭
// 所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
這個方法有兩個出口,一個是 tryLock() 成功了,迴圈終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨佔鎖。
這個方法就是看似複雜,但是其實就是做了一件事,那就是獲取該 segment 的獨佔鎖,如果需要的話順便實體化了一下 node。
擴容: rehash
重覆一下,segment 陣列不能擴容,擴容是 segment 陣列某個位置內部的陣列 HashEntry\[] 進行擴容,擴容後,容量為原來的 2 倍。
首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那麼先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。
該方法不需要考慮併發,因為到這裡的時候,是持有該 segment 的獨佔鎖的。
// 方法引數上的 node 是這次擴容後,需要新增到新的陣列中的資料。
private void rehash(HashEntry
node) { HashEntry
[] oldTable = table; int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 建立新陣列
HashEntry
[] newTable = (HashEntry
[]) new HashEntry[newCapacity]; // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 為 31,對應二進位制 ‘000…00011111’
int sizeMask = newCapacity – 1;
// 遍歷原陣列,老套路,將原陣列位置 i 處的連結串列拆分到 新陣列位置 i 和 i+oldCap 兩個位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是連結串列的第一個元素
HashEntry
e = oldTable[i]; if (e != null) {
HashEntry
next = e.next; // 計算應該放置在新陣列中的位置,
// 假設原陣列長度為 16,e 在 oldTable[3] 處,那麼 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 該位置處只有一個元素,那比較好辦
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是鏈表表頭
HashEntry
lastRun = e; // idx 是當前連結串列的頭結點 e 的新位置
int lastIdx = idx;
// 下麵這個 for 迴圈會找到一個 lastRun 節點,這個節點之後的所有元素是將要放到一起的
for (HashEntry
last = next; last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 將 lastRun 及其之後的所有節點組成的這個連結串列放到 lastIdx 這個位置
newTable[lastIdx] = lastRun;
// 下麵的操作是處理 lastRun 之前的節點,
// 這些節點可能分配在另一個連結串列中,也可能分配到上面的那個連結串列中
for (HashEntry
p = e; p != lastRun; p = p.next) { V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry
n = newTable[k]; newTable[k] = new HashEntry
(h, p.key, v, n); }
}
}
}
// 將新來的 node 放到新陣列中剛剛的 兩個連結串列之一 的 頭部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
這裡的擴容比之前的 HashMap 要複雜一些,程式碼難懂一點。上面有兩個挨著的 for 迴圈,第一個 for 有什麼用呢?
仔細一看發現,如果沒有第一個 for 迴圈,也是可以工作的,但是,這個 for 迴圈下來,如果 lastRun 的後面還有比較多的節點,那麼這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,後面的一串節點跟著 lastRun 走就是了,不需要做任何操作。
我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是連結串列的最後一個元素或者很靠後的元素,那麼這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用預設的閾值,大約只有 1/6 的節點需要克隆。
get 過程分析
相對於 put 來說,get 真的不要太簡單。
計算 hash 值,找到 segment 陣列中的具體位置,或我們前面用的“槽”
槽中也是一個陣列,根據 hash 找到陣列中具體的位置
到這裡是連結串列了,順著連結串列進行查詢即可
public V get(Object key) {
Segment
s; // manually integrate access methods to reduce overhead HashEntry
[] tab; // 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根據 hash 找到對應的 segment
if ((s = (Segment
)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// 3. 找到segment 內部陣列相應位置的連結串列,遍歷
for (HashEntry
e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length – 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
併發問題分析
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮併發問題。
新增節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨佔鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。
put 操作的執行緒安全性。
-
初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的陣列。
-
新增節點到連結串列的操作是插入到表頭的,所以,如果這個時候 get 操作在連結串列遍歷的過程已經到了中間,是不會影響的。當然,另一個併發問題就是 get 操作在 put 之後,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
-
擴容。擴容是新建立了陣列,然後進行遷移資料,最後面將 newTable 設定給屬性 table。所以,如果 get 操作此時也在進行,那麼也沒關係,如果 get 先行,那麼就是在舊的 table 上做查詢操作;而 put 先行,那麼 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
remove 操作的執行緒安全性。
remove 操作我們沒有分析原始碼,所以這裡說的讀者感興趣的話還是需要到原始碼中去求實一下的。
get 操作需要遍歷連結串列,但是 remove 操作會”破壞”連結串列。
如果 remove 破壞的節點 get 操作已經過去了,那麼這裡不存在任何問題。
如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那麼需要將頭結點的 next 設定為陣列該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供陣列內部操作的可見性保證,所以原始碼中使用了 UNSAFE 來運算元組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裡的併發保證就是 next 屬性是 volatile 的。
看完本文有收穫?請轉發分享給更多人
關註「ImportNew」,提升Java技能