Java Map 07 - ConcurrentHashMap

  关于 java.util.concurrent.ConcurrentHashMap<K, V> 的部分笔记,ConcurrentHashMap支持多个线程同一时刻访问集合内部的不同元素的特性,是HashMap的另一个线程安全版本(另一个是Hashtable)。本文演示代码段的执行环境基于JDK版本1.7

概述

  ConcurrentHashMap是一个支持并发访问的map集合,和Hashtable一样,都是HashMap的线程安全版本。但是和Hashtable不一样的是,ConcurrentHashMap可以允许多个线程在同一时刻对map集合中的多个键值对进行操作,而Hashtable则因为当前线程会锁定整个集合导致同一时刻只能有一个线程可以访问Hashtable集合。

  ConcurrentHashMap的实现思想是将整个map集合拆分成了多个子集合,每个子集合由一个 Segment实例进行维护。线程访问时会对某个特定的segment实例进行锁定,其他的segment实例继续保持自由状态,这样就实现了多个线程同时访问ConcurrentHashMap集合中不同元素的特性。但是需要注意的是,如果多个线程访问的是同一个segment实例内部的键值对实例,那么由于同一时刻只有一个线程会保存对该segment实例的锁定,所以依旧会出现线程等待的现象。

  每个segment实例内部是一个HashEntry数组,数组的每个节点维护了一个链表集合,该集合存储和维护了键值对映射实体。

  ConcurrentHashMap的底层存储结构如图1所示:

图 - 1

继承关系

1
2
3
4
// ConcurrentHashMap<E>
--java.lang.Object
--java.util.AbstractMap<K,V>
--java.util.concurrent.ConcurrentHashMap<K,V>

实现接口

类名 实现接口
ConcurrentHashMap<E> Serializable, ConcurrentMap,Map<K,V>

ConcurrentHashMap

Constructor Summary

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

  初始化一个空ConcurrentHashMap集合,集合的初始容量和加载因子由initialCapacity和loadFactor确认,当前map集合可以支持的操作线程数上限由concurrencyLevel确定。

  第4 ~ 5行代码对传入的参数做基本校验。如果concurrencyLevel大于最大值(MAX_SEGMENTS = 1 << 16),那么就用最大值指定线程数。第9 ~ 14行代码计算大于concurrencyLevel的最小的2次幂结果,该结果会作为当前concurrencyLevel进行分组的依据,也就是说,最终map中会存在ssize个segment分组。

  第17 ~ 18行代码会确认最终的map集合容量。第19行代码计算map集合中每个segment分组中容纳的键值对数量。第20 ~ 21行代码会确认最终的每个segment分组的键值对数量。第22 ~ 24行代码根据计算得到的c计算最终的segment容量。

  第26 ~ 28行代码初始化一个segment实体,该实体以当前map集合中segment分组的第一个元素存在。第29行代码会完成当前map集合的segment数组的初始化操作,最后map集合初始化完成。

public ConcurrentHashMap(int initialCapacity, float loadFactor)

1
2
3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

  初始化一个空ConcurrentHashMap集合,集合的初始容量和加载因子由initialCapacity和loadFactor确认,当前map集合可以支持的操作线程数上限默认16个。底层调用ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成初始化操作。

public ConcurrentHashMap(int initialCapacity)

1
2
3
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

  初始化一个空ConcurrentHashMap集合,集合的初始容量和加载因子由initialCapacity确认,当前map集合可以支持的操作线程数上限默认16个,加载因子默认为0.75。底层调用ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成初始化操作。

public ConcurrentHashMap()

1
2
3
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

  初始化一个空ConcurrentHashMap集合,集合的初始容量默认为16,当前map集合可以支持的操作线程数上限默认16个,加载因子默认为0.75。底层调用ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成初始化操作。

public ConcurrentHashMap(Map<? extends K, ? extends V> m)

1
2
3
4
5
6
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}

  初始化一个空ConcurrentHashMap集合,集合的初始容量由m集合存储的键值对数量和默认值的大小关系决定,当前map集合可以支持的操作线程数上限默认16个,加载因子默认为0.75。底层调用ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成初始化操作。最后通过调用方法putAll(Map<? extends K, ? extends V> m)将m集合中的键值对保存到初始化的集合中。

部分方法

public boolean isEmpty()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean isEmpty() {
/*
* Sum per-segment modCounts to avoid mis-reporting when
* elements are concurrently added and removed in one segment
* while checking another, in which case the table was never
* actually empty at any point. (The sum ensures accuracy up
* through at least 1<<31 per-segment modifications before
* recheck.) Methods size() and containsValue() use similar
* constructions for stability checks.
*/
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L)
return false;
}
return true;
}

  判断当前map集合是否为空。借鉴了double-check的思想,如果遇到一个不为空的segment实体,那么直接返回false。否则根据seg.modCount的和差结果判断在该方法执行过程中是否存在其他线程对当前map做了写入和删除操作,以便最终确认当时的map集合状态。

static final Segment segmentAt(Segment[] ss, int j)

1
2
3
4
5
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
}

  获取当前map集合中segment数组中下标 j 处的segment实体。

public int size()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

  获取当前map集合存储的键值对数量。统计的思想是重复统计和判断:先在不对segment加锁的情况下连续两次计算求和,如果结果一致,那么返回计算结果。如果结果不一致,那么在对segment加锁的情况下再求和并返回统计结果。如果统计过程中对segment加了锁,那么需要在方法结束时释放锁。

public V get(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

  从当前map集合中取出可以匹配key的键值对映射实体,如果没有找到对应的实体,返回null。

  第4行代码获取当前key的hashcode值,并根据hashcode的高位计算出该key位于哪个segment集合中。如果可以找到对应的segment对象,那么再从根据key的hashcode计算在当前segment中下标并取得该下标位置的链表集合。根据key的hashcode调用equals方法判断是否存在对应的映射实体:如果存在就返回映射的value值,否则返回null。

private int hash(Object k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int hash(Object k) {
int h = hashSeed;

if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}

h ^= k.hashCode();

// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}

  计算hashcode值。对原生的hashcode值(默认实现得到的hashcode)做了扰动处理,保证任意一位的变化都能产生不同。

public boolean containsKey(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SuppressWarnings("unchecked")
public boolean containsKey(Object key) {
Segment<K,V> s; // same as get() except no need for volatile value read
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return true;
}
}
return false;
}

  判断当前map集合中是否存在匹配key的映射,如果存在返回true,否则返回false。

public boolean containsValue(Object value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public boolean containsValue(Object value) {
// Same idea as size()
if (value == null)
throw new NullPointerException();
final Segment<K,V>[] segments = this.segments;
boolean found = false;
long last = 0;
int retries = -1;
try {
outer: for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
for (int j = 0; j < segments.length; ++j) {
HashEntry<K,V>[] tab;
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null && (tab = seg.table) != null) {
for (int i = 0 ; i < tab.length; i++) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) {
found = true;
break outer;
}
}
}
sum += seg.modCount;
}
}
if (retries > 0 && sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}

  判断当前map集合中是否存在可以匹配value的键值对实体。

public boolean contains(Object value)

1
2
3
public boolean contains(Object value) {
return containsValue(value);
}

  判断当前map集合中是否存在可以匹配value的键值对实体。

public V put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

  将key=value加入到当前map集合中。

  第6 ~ 7行代码计算key的hashcode并根据hashcode计算其在segment数组中的下标位置。如果该下标位置尚未存储一个segment实体,那么就调用ensureSegment(int k)初始化一个segment实体并加入到segment数组中,否则加入到对应的segment实体的内部数组结构中。

private Segment<K,V> ensureSegment(int k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}

  创建并插入一个segment元素。

  入参k是经过hash计算之后的结果,根据k计算出在当前map集合中对应的偏移量以及在该位置上的segment分组。如果分组为空,那么从segment数组的第一个segment元素里得到需要创建的segment分组的容量,加载因子和扩容阈值等信息。再次查询当前map集合,确认确实需要创建一个新的segment实体,然后通过new实例化一个segment实体,再次确认是否需要创建一个新的segment实体,如果需要,那么将当前新初始化的segment实体加入到当前map的segment数组中。最后返回创建的segment实体。

public void putAll(Map<? extends K, ? extends V> m)

1
2
3
4
public void putAll(Map<? extends K, ? extends V> m) {
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
put(e.getKey(), e.getValue());
}

  将m集合中的键值对映射加入到当前map集合中。遍历m集合的每个键值对实体,调用put(K key, V value)方法完成所有操作。

public V remove(Object key)

1
2
3
4
5
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}

  从当前map集合中删除可以匹配key的键值对映射。如果确实存在可以匹配key的键值对,那么就删除并返回被阐述的键值对实体,否则返回null。

public boolean remove(Object key, Object value)

1
2
3
4
5
6
public boolean remove(Object key, Object value) {
int hash = hash(key);
Segment<K,V> s;
return value != null && (s = segmentForHash(hash)) != null &&
s.remove(key, hash, value) != null;
}

  从当前map集合中删除可以匹配key和value的键值对映射。如果确实存在可以匹配的键值对,那么就删除并返回true,否则返回false。

private Segment<K,V> segmentForHash(int h)

1
2
3
4
5
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}

  根据hash结果计算segment数组的下标并返回该下标位置处的segment对象实体,如果不存在就返回null。

static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h)

1
2
3
4
5
6
7
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
HashEntry<K,V>[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}

  根据hash结果计算指定segment中的hashEntry数组下标并返回该下标位置处的键值对实体集合对象,如果不存在就返回null。

public boolean replace(K key, V oldValue, V newValue)

1
2
3
4
5
6
7
public boolean replace(K key, V oldValue, V newValue) {
int hash = hash(key);
if (oldValue == null || newValue == null)
throw new NullPointerException();
Segment<K,V> s = segmentForHash(hash);
return s != null && s.replace(key, hash, oldValue, newValue);
}

  用newValue替换键值对key=value实体的value值。如果oldValue和newValue都为空,那么抛出空指针异常。根据key计算hashcode值并找到该hashcode值所处的segment实体。如果成功更新了值就返回true,否则返回false。

public V replace(K key, V value)

1
2
3
4
5
6
7
public V replace(K key, V value) {
int hash = hash(key);
if (value == null)
throw new NullPointerException();
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.replace(key, hash, value);
}

  用value替换可以匹配key的键值对实体的value值。如果value为空,那么抛出空指针异常。根据key计算hashcode值并找到该hashcode值所处的segment实体。如果成功更新了值就更新之前的value。

public void clear()

1
2
3
4
5
6
7
8
public void clear() {
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> s = segmentAt(segments, j);
if (s != null)
s.clear();
}
}

  清空当前map集合。

public Set<K> keySet()

1
2
3
4
public Set<K> keySet() {
Set<K> ks = keySet;
return (ks != null) ? ks : (keySet = new KeySet());
}

  返回一个当前map集合的key值的set集合。KeySet的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final class KeySet extends AbstractSet<K> {
public Iterator<K> iterator() {
return new KeyIterator();
}
public int size() {
return ConcurrentHashMap.this.size();
}
public boolean isEmpty() {
return ConcurrentHashMap.this.isEmpty();
}
public boolean contains(Object o) {
return ConcurrentHashMap.this.containsKey(o);
}
public boolean remove(Object o) {
return ConcurrentHashMap.this.remove(o) != null;
}
public void clear() {
ConcurrentHashMap.this.clear();
}
}

  依赖的迭代器KeyIterator的实现如下:

1
2
3
4
final class KeyIterator extends HashIterator implements Iterator<K>, Enumeration<K> {
public final K next() { return super.nextEntry().key; }
public final K nextElement() { return super.nextEntry().key; }
}

public Collection<V> values()

1
2
3
4
public Collection<V> values() {
Collection<V> vs = values;
return (vs != null) ? vs : (values = new Values());
}

  返回一个当前map集合的value值的collection集合。Values的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final class Values extends AbstractCollection<V> {
public Iterator<V> iterator() {
return new ValueIterator();
}
public int size() {
return ConcurrentHashMap.this.size();
}
public boolean isEmpty() {
return ConcurrentHashMap.this.isEmpty();
}
public boolean contains(Object o) {
return ConcurrentHashMap.this.containsValue(o);
}
public void clear() {
ConcurrentHashMap.this.clear();
}
}

  依赖的迭代器ValueIterator实现如下:

1
2
3
4
final class ValueIterator extends HashIterator implements Iterator<V>, Enumeration<V> {
public final V next() { return super.nextEntry().value; }
public final V nextElement() { return super.nextEntry().value; }
}

public Set<Map.Entry<K,V>> entrySet()

1
2
3
4
public Set<Map.Entry<K,V>> entrySet() {
Set<Map.Entry<K,V>> es = entrySet;
return (es != null) ? es : (entrySet = new EntrySet());
}

  返回一个当前map集合的键值对实体的set集合。EntrySet的实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
public Iterator<Map.Entry<K,V>> iterator() {
return new EntryIterator();
}
public boolean contains(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
V v = ConcurrentHashMap.this.get(e.getKey());
return v != null && v.equals(e.getValue());
}
public boolean remove(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>)o;
return ConcurrentHashMap.this.remove(e.getKey(), e.getValue());
}
public int size() {
return ConcurrentHashMap.this.size();
}
public boolean isEmpty() {
return ConcurrentHashMap.this.isEmpty();
}
public void clear() {
ConcurrentHashMap.this.clear();
}
}

  依赖的EntryIterator实现如下:

1
2
3
4
5
6
final class EntryIterator extends HashIterator implements Iterator<Entry<K,V>> {
public Map.Entry<K,V> next() {
HashEntry<K,V> e = super.nextEntry();
return new WriteThroughEntry(e.key, e.value);
}
}

private void writeObject(java.io.ObjectOutputStream s)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void writeObject(java.io.ObjectOutputStream s) throws IOException {
// force all segments for serialization compatibility
for (int k = 0; k < segments.length; ++k)
ensureSegment(k);
s.defaultWriteObject();

final Segment<K,V>[] segments = this.segments;
for (int k = 0; k < segments.length; ++k) {
Segment<K,V> seg = segmentAt(segments, k);
seg.lock();
try {
HashEntry<K,V>[] tab = seg.table;
for (int i = 0; i < tab.length; ++i) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
s.writeObject(e.key);
s.writeObject(e.value);
}
}
} finally {
seg.unlock();
}
}
s.writeObject(null);
s.writeObject(null);
}

  序列化ConcurrentHashMap时使用的自定义实现方法。

  第2 ~ 3行代码负责检查当前ConcurrentHashMap的底层segment数组是否存在为空的位置,如果存在,那么就实例化一个空对象并存储到对应的下标位置上,避免在反序列化时因null引起的二义性和不确定性。

  第8 ~ 23行代码依次遍历底层的segment数组,获的锁并将其内部的每个键值对按照先key后value的顺序写入到流中,最后释放占用的锁。

  最后又向流里写入了两个null表示当前ConcurrentHashMap的流写入过程结束。

private void readObject(java.io.ObjectInputStream s)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@SuppressWarnings("unchecked")
private void readObject(java.io.ObjectInputStream s)
throws IOException, ClassNotFoundException {
// Don't call defaultReadObject()
ObjectInputStream.GetField oisFields = s.readFields();
final Segment<K,V>[] oisSegments = (Segment<K,V>[])oisFields.get("segments", null);

final int ssize = oisSegments.length;
if (ssize < 1 || ssize > MAX_SEGMENTS
|| (ssize & (ssize-1)) != 0 ) // ssize not power of two
throw new java.io.InvalidObjectException("Bad number of segments:"
+ ssize);
int sshift = 0, ssizeTmp = ssize;
while (ssizeTmp > 1) {
++sshift;
ssizeTmp >>>= 1;
}
UNSAFE.putIntVolatile(this, SEGSHIFT_OFFSET, 32 - sshift);
UNSAFE.putIntVolatile(this, SEGMASK_OFFSET, ssize - 1);
UNSAFE.putObjectVolatile(this, SEGMENTS_OFFSET, oisSegments);

// set hashMask
UNSAFE.putIntVolatile(this, HASHSEED_OFFSET, randomHashSeed(this));

// Re-initialize segments to be minimally sized, and let grow.
int cap = MIN_SEGMENT_TABLE_CAPACITY;
final Segment<K,V>[] segments = this.segments;
for (int k = 0; k < segments.length; ++k) {
Segment<K,V> seg = segments[k];
if (seg != null) {
seg.threshold = (int)(cap * seg.loadFactor);
seg.table = (HashEntry<K,V>[]) new HashEntry[cap];
}
}

// Read the keys and values, and put the mappings in the table
for (;;) {
K key = (K) s.readObject();
V value = (V) s.readObject();
if (key == null)
break;
put(key, value);
}
}

  反序列化ConcurrentHashMap时使用的自定义实现方法。

  第5行代码从流中得到ConcurrentHashMap的所有被序列化了的字段内容,并在第6行代码执行完后得到segments的数据内容。第8 ~ 17行代码根据流计算ConcurrentHashMap中segment数组的长度,以及其他的参数值。第18 ~ 23行代码完成相关字段的赋值操作。

  第27 ~ 34行会初始化一个segment数组,这个过程得到的是一个仅包含segment实例的数组,每个segment实例内部的hashEntry数组都是空的。第37 ~ 43行代码则将键值对映射存储到对应的segment实例的HashEntry数组内。

HashIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
abstract class HashIterator {
int nextSegmentIndex;
int nextTableIndex;
HashEntry<K,V>[] currentTable;
HashEntry<K, V> nextEntry;
HashEntry<K, V> lastReturned;

HashIterator() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
advance();
}

/**
* 获取遍历操作执行的第一个节点,顺序是按照从尾到首的顺序遍历的。
* 循环遍历,如果nextTableIndex可以指示到某个segment实例中的hashEntry数组的某个位置,
* 那么取得该位置的元素,如果元素不为空,跳出循环并结束方法,否则向前遍历hashEntry数组的前一个
* 位置,直到找到一个不为null的元素。
*
*/
final void advance() {
for (;;) {
if (nextTableIndex >= 0) {
if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null)
break;
}
else if (nextSegmentIndex >= 0) {
Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
if (seg != null && (currentTable = seg.table) != null)
nextTableIndex = currentTable.length - 1;
}
else
break;
}
}

/**
* 遍历执行。
* 该方法通常情况是遍历某个segment实例中的HashEntry数组上某个下标位置的链表集合的,
* 如果(nextEntry = e.next) == null那么认为当前下标位置的链表集合已经遍历完全需要遍历
* 下一个下标位置上的链表集合,那么就调用advance()方法取得下个不为空的链表集合并继续遍历
*
*/
final HashEntry<K,V> nextEntry() {
HashEntry<K,V> e = nextEntry;
if (e == null)
throw new NoSuchElementException();
lastReturned = e; // cannot assign until after null check
if ((nextEntry = e.next) == null)
advance();
return e;
}

/**
* 判断是否可以继续遍历,true表示可以继续遍历,false表示已经遍历完全没有新的元素可供遍历
*
*/
public final boolean hasNext() { return nextEntry != null; }
/**
* 判断是否可以继续遍历,true表示可以继续遍历,false表示已经遍历完全没有新的元素可供遍历
*
*/
public final boolean hasMoreElements() { return nextEntry != null; }
/**
* 移除当前遍历的元素。
* 直接调用ConcurrentHashMap的remove(Object key)方法根据key完成删除操作即可。
*
*/
public final void remove() {
if (lastReturned == null)
throw new IllegalStateException();
ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
}
}

  Segment实例内部用来遍历键值对实例的迭代器。

涉及基础知识点

  1. NIL

参考文献

  1. TopGun_Viper. juc系列-ConcurrentHashMap [E]
  2. AbeJeffrey. ConcurrentHashMap实现原理和源码解读 [E]




------------- End of this article, thanks! -------------


  版权声明:本文由N.C.Lee创作和发表,采用署名(BY)-非商业性使用(NC)-相同方式共享(SA)国际许可协议进行许可,转载请注明作者及出处。
  本文作者为 N.C.Lee
  本文标题为 Java Map 07 - ConcurrentHashMap
  本文链接为 https://marcuseddie.github.io/2018/java-Collection-ConcurrentHashMap.html