本文分为两个部分,第一个是对跳表(SKipList)这种数据结构的介绍,第二部分则是对Java中ConcurrentSkilListMap
的源码解读.
关于跳表的概述,强力推荐这篇文章,漫画算法:什么是跳表,我也是看了这个之后才豁然开朗,讲解的通俗又生动.
跳跃列表是一种数据结构。它允许快速查询一个有序元素的数据链表。跳跃列表的平均查找和插入时间复杂度都是O(log n),优于普通队列的O(n)。
想一下我们日常对列表的存储方式:
数组查询较快,支持随机访问以及二分法查找,但是插入和删除数据需要移动较多的值,效率较低.时间复杂度为O(n).
链表的插入较快,但是在插入之前需要先寻找合适的位置,这一步骤也是O(n)的复杂度,因此链表的插入和查询都是O(n)的复杂度.
那么有没有一种在插入和查询上都表现比较好的数据结构呢?
有,各种查找树平衡树等,今天这里学习一种新的实现方式:跳表.
其实跳表的原理,对计算机行业的同学来说,一张图就可以明了.
即原来的链表不变,额外维护一份索引(1级),记录某几个值的位置及值.
当数据量过大,比如有10w数据,那么维护的索引有一半的话也有5w,那么查找依然很慢,这个时候可以再加一层索引,数据量就只有2w5了.
一直这样添加索引,到最后会额外多维护一份数据,但是模拟了二分查找,因此在插入和查找上都是O(logn)的时间复杂度,空间复杂度为O(2n)也就是O(n).
当然跳表有非常多的变种,可以在上面的策略上进行各种调整,以使他的时间复杂度和空间复杂度接近自己的要求.
当删除或者添加的时候,会造成原来链表结构的变化,这时候上面的各层索引也需要相应的修改,那么如何决定一个节点是否应该被提升一层作为一个索引呢?在跳表中使用的是随机的方式,比在二叉搜索树中的方法要轻量级许多.
ConcurrentSkipListMap 一个并发安全, 基于 skip list 实现有序存储的Map.
ConcurrentSkilListMap内部封装了几个数据结构,我们看一下:
// 一个节点,保存了key,value,还有指向下一个节点的指针
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
}
// 索引,包含一个上面的节点,还有右边的索引(同一级),下边的索引(下一级)
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
}
// 每一层的头索引,带有当前的level.
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
直接看代码有点难以下手,我们根据他提供的一些公共的API来逐一看一下实现方式.
这个方法我们必须首先看一下,是我们在使用HashMap
等结构的时候常用的put方法.
这里大概说一下流程,代码里加上了注释.
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
/**
* Main insertion method. Adds element if not present, or
* replaces value if present and onlyIfAbsent is false.
* @param key the key
* @param value the value that must be associated with key
* @param onlyIfAbsent if should not insert if already present
* @return the old value, or null if newly inserted
*/
/**
* 插入的主方法,如果不存在则插入元素,如果存在且传入的`onlyIfAbsent`为false则替换掉值.
*/
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 找到前置节点,在base-level上的.
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// 发生了多线程竞争,break.重新试一遍
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果key大于前置节点,继续
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// 等于则赋值
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 竞争失败了,重新来
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 如果该前置节点的后继节点为空,则直接进行插入,使用cas操作连接传入的节点.
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
// 获取level
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
// 添加z的索引数据
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // 添加新的一层索引
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
get方法比较简单:
/**
* Gets value for key. Almost the same as findNode, but returns
* the found value (to avoid retries during re-reads)
*
* @param key the key
* @return the value, or null if absent
*/
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 寻找前置节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 如果为空则不存在,直接返回
if (n == null)
break outer;
Node<K,V> f = n.next;
// 多线程竞争,重新试一下
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果相等,则返回值.
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
可以看到在get
和put
方法中,寻找前置节点都比较重要.
寻找前置节点的思路比较明了,就是跳表的查找思路:
代码如下:
/**
* Returns a base-level node with key strictly less than given key,
* or the base-level header if there is no such node. Also
* unlinks indexes to deleted nodes found along the way. Callers
* rely on this side-effect of clearing indices to deleted nodes.
* @param key the key
* @return a predecessor of key
*/
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// 拿到矩形索引的左上角的索引
for (Index<K,V> q = head, r = q.right, d;;) {
// 右侧索引不为空
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// 竞争
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
// 如果小于传入的key,则向右查找.
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 到达最后一层了,数据层,返回拿到的节点.就是后继几点.
if ((d = q.down) == null)
return q.node;
// 这边是右侧的索引为空,则可以直接向下一层了.
q = d;
r = d.right;
}
}
}
思路比较简单:
/**
* Main deletion method. Locates node, nulls value, appends a
* deletion marker, unlinks predecessor, removes associated index
* nodes, and possibly reduces head index level.
*
* Index nodes are cleared out simply by calling findPredecessor.
* which unlinks indexes to deleted nodes found along path to key,
* which will include the indexes to this node. This is done
* unconditionally. We can't check beforehand whether there are
* index nodes because it might be the case that some or all
* indexes hadn't been inserted yet for this node during initial
* search for it, and we'd like to ensure lack of garbage
* retention, so must call to be sure.
*
* @param key the key
* @param value if non-null, the value that must be
* associated with key
* @return the node, or null if not found
*/
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 寻找前置节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 竞争
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
// 如果节点不存在,直接返回
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
// 不删除了
if (value != null && !value.equals(v))
break outer;
// 进行删除操作
if (!n.casValue(v, null))
break;
// 删除索引
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
findPredecessor(key, cmp); // clean index
// 如果一层索引都空了,则删除这一层.
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
跳表是一种可以在有序的列表上进行快速的查找的数据结构,他的查找删除插入的时间复杂度都O(logN).在跳表中,存储数据的就是底层的一个单链表,同时维护一份索引,索引是最多31条单链表,并且不同层的相同节点在纵向上也是一个链表,因此索引其实是一个矩形的结构,查找时,从矩形的左上角逐渐向右向下查找.
ConcurrentSkilListMap是Java中对于跳表的一个并发实现,且没有使用锁机制,采用了CAS算法以提供并发的能力.
http://blog.jobbole.com/111731/ https://www.jianshu.com/p/edc2fd149255
完。
2019-05-19 完成
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客——>呼延十
var gitment = new Gitment({ id: '跳表(skiplist)的原理及concurrentskiplistmap的源码学习', // 可选。默认为 location.href owner: 'hublanker', repo: 'blog', oauth: { client_id: '2297651c181f632a31db', client_secret: 'a62f60d8da404586acc965a2ba6a6da9f053703b', }, }) gitment.render('container')