数据结构回顾(四)ConcurrentHashMap1.7
前言
上两篇笔记已经回顾了HashMap以及红黑树两种数据结构的实现,下一阶段想看的就是ConcurrentHashMap的1.7和1.8版本的不同实现
Field
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile int count; //Segment中元素的数量
transient int modCount; //对table的大小造成影响的操作的数量(比如put或者remove操作)
transient int threshold; //阈值,Segment里面元素的数量超过这个值那么就会对Segment进行扩容
final float loadFactor; //负载因子,用于确定threshold
transient volatile HashEntry<K,V>[] table; //链表数组,数组中的每一个元素代表了一个链表的头部
}
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
final Segment<K,V>[] segments;
static final int DEFAULT_CONCURRENCY_LEVEL = 16; //初始的并发等级,通过并发等级来确定Segment的大小
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;// segment的最小值
static final int MAX_SEGMENTS = 1 << 16; // segment的最大值
static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 默认segment数量
static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默认负载因子
static final int DEFAULT_INITIAL_CAPACITY = 16; // 默认初始化容量
构造方法
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 1
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 2
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
// 3
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1;
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
步骤1:对传入的初始化容量,负载因子,segment数量进行校验
步骤2:根据传入的concurrencyLevel计算ssize用于segment数组的初始化,由此步可见,segment数组的长度是不大于concurrencyLevel的2的倍数,并且在这一步记录下了两个成员变量segmentShift和segmentMask
- **segmentShift = **32 - log2(ssize)
- **segmentMask = ** ssize - 1
步骤3:计算initialCapacity和ssize的整数比,然后cap也就是segment数组中table的长度就是不大于这个整数比的2的倍数,然后用计算好的ssize和cap两个值创建segments数组和segment[0]的table数组
[ConcurrentHashMap-> put]
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
// 2
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3
return s.put(key, hash, value, false);
}
步骤1:通过位运算计算出一个hash值后会右移32 - log2(segmentSize)之后&segmentSize - 1得到一个索引值j,
步骤2:索引值j再通过与static块中定义好的静态变量运算获得地址,拿到对应segment,如果segment为空则调用ensureSegment方法创建对应的segment[j]
步骤3:在对应的segment中插入数据
[ConcurrentHashMap-> ensureSegment]
private Segment<K,V> ensureSegment(int k) {
// 1
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
// 2
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 3
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
步骤1:采用和ConcurrHashMap-> put相同的方式计算地址值
步骤2:由步骤1中计算出的地址值可见性地获取segment[k]即segment[j],如果为空则按照segment[0]的HashEntry数组长度和加载因子初始化segment[k],这也解释了为什么在构造函数中只创建segment[0],这是因为其他segment创建的模版就是segment[0],而其他segment是延时创建的,只有在使用的时候才创建
步骤3:重新可见性检查segment[k]是否存在,防止在步骤2中有其他线程创建了segment[k],然后用CAS的方式给segment[k]赋值
[Segment-> put]
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 1
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 2
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
// 3
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 4
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
// 5
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
步骤1:segment尝试获取锁, 如果失败就调用scanAndLockForPut获得创建的HashEntry
步骤2:根据哈希值定位键值对在HashEntry数组上的位置
步骤3:遍历链表,如果对应的键已经存在,则直接覆盖原值
步骤4:如果node不为null的情况下,把新插入的node节点作为链表的第一个节点,否则就创建一个HashEntry同样把它设置为第一个节点
步骤5:容量加一,进行扩容判断,扩容的条件是:当前的容量大于阀值并且小于最大容量,则进行rehash扩容操作,然后tab数组index位置的首节点设置为node
存疑:决定node是否为空的依据就是tryLock是否成功,成功了就会延时创建,不成功则会调用scanAndLockForPut创建,这个方法做了什么呢?其次,扩容机制是怎么实现的呢?
[Segment-> scanAndLockForPut]
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 1
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 2
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
// 2.1
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
// 2.2
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
// 2.3
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
步骤1:根据hash值获取tab数组中对应的entry
步骤2:CAS方式获得锁,在这个过程中:
- 2.1: retries自选次数的初值为-1,这里依然会遍历链表,如果当链表头为空或者key已经存在的情况下会把retries的值置为0,即代表可以开始自旋
- 2.2:当可以开始自旋之后每次自选会递增retries的值,直到到达最大的自选次数,而这个最大值是由cpu的数量来决定的,如果是在多cpu的情况下则允许64次的最大自旋,到达了便阻塞,用阻塞的方式解决自旋带来的性能开销,而如果是单cpu的情况下,则最大自旋的次数只有1
- 2.3:当自旋过程中头节点发生变化,则给retries重新赋值为-1,也就是会对链表重新遍历
小结:这个方法的实现非常厉害,如果在segment-> put操作的开始能够获得锁那当然是最好的,但是即便没拿到,它也不会傻等,或者马上就阻塞,而是会以乐观锁的方式架设链表不会变,然后如果链表变了再重新遍历
[Segment-> rehash]
private void rehash(HashEntry<K,V> node) {
// 1
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 2
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
} else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
// 3
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
步骤1:获取旧的容量并且计算新的容量,新的容量是旧容量的两倍,然后便创建新的数组和新的掩码
步骤2:遍历旧的tab数组并用hash值计算新的位置,如果旧tab数组中当前位置的元素只有一个,那么直接赋值到新数组就可以了,如果不只有一个元素,那就一直往后遍历,直到找到和头结点不同索引值,那么那之后节点就在另一个桶里面了,而剩下的就是从头开始遍历到不相同节点的前一个节点,分别把这两条链赋值到新数组相应的位置就行了
步骤3:收尾操作:处理引起扩容的那个待添加的节点并且把新tab数组赋值上去
[ConcurrentHashMap-> get]
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 2
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
步骤1:计算相应的hash值,然后与put操作类似,然后根据hash值计算key在哪个segment ,然后可见性获取segment
步骤2:获取segment中的tab数组,遍历table中的HashEntry元素,找到相同的key,返回value
小结:读操作之所以这么轻松简单,得益于写操作的贡献,由于写操作会把新节点添加在链表头,所以不会影响读的操作,而由于有UNSAFE api,保证了能够无锁且获取到最新的volatile变量的值
[ConcurrentHashMap-> remove]
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
调用segment的remove方法执行删除
[Segment-> remove]
final V remove(Object key, int hash, Object value) {
// 1
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
// 2
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
// 3
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
步骤1:与添加逻辑相似,尝试获得锁,如果失败了就调用scanAndLock方法
步骤2:依然是根据哈希值定位到对应tab数组的位置,并创建一个pred节点,应该是用来保存待删除节点的前一个,继续往下看
步骤3:遍历链表,如果找到相同的key而且从该键取出来的值又等于传入的value,这个时候就可以执行删除操作,如果待删除节点就是链表的头结点,那么数组该位置上的元素就换成待删除节点的后一个,否则,就把连接待删除节点的prev和next,完成节点的删除
[Segment-> scanAndLock]
private void scanAndLock(Object key, int hash) {
// 1
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
// 2
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
步骤1:依然是从tab数组中找到该hash值的第一个元素,也就是链表头,然后同样有一个retries自旋次数
步骤2:这里执行的操作和put操作比较相似,只不过这里的条件变成链表为空以及找到了对应的节点,这里就没有可以在CAS中创建节点的操作了
参考资料: