前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

如何保证容器是线程安全的? ConcurrentHashMap 如何高效的线程安全?

作者头像
王小明_HIT
发布2020-05-12 16:33:26
1.1K0
发布2020-05-12 16:33:26
举报
文章被收录于专栏:程序员奇点

如何保证容器是线程安全的?ConcurrentHashMap 如何高效的线程安全?

Java提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用Collections工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。其实可以利用并发包提供的线程安全容器。

  • 各种并发容器,比如 ConcurrentHashMap、CopyOnWriteArrayList
  • 各种线程安全队列(Queue/Deque),比如 ArrayBlockingQueue,SynchronousQueue
  • 各种有序容器的线程安全版本。

如何保证线程安全

首先要保障线程安全的几个基本特性, 原子性,可见性,有序性。其次可以通过封装的方式将内部对象保护起来,保证变量对象的不可变性,一般就线程安全了。

  • 理解基本的线程安全工具
  • 理解传统集合矿建并发变成中 Map 存在的问题,清楚简单同步方式的不足
  • 梳理并发包内,尤其是 ConcurrentHashMap 采取了哪些方法来提高并发表现。
  • 最好能够掌握 ConcurrentHashMap 自身的演进,目前很多分析资料还是基于早期版本。

为什么需要 ConcurrentHashMap

Hashtable 是怎样实现线程安全的。

Hashtable 能够保证线程安全,但是它的基本就是将 put ,get ,size 等各种操作加上 synchronized, 这样就导致了所有并发操作都要竞争一把锁,一个线程在进行同步操作时,其他线程只能等待,大大减低了并发效率。

  • https://github.com/zhuzhongji/jdk7
代码语言:javascript
复制
@SuppressWarnings("unchecked")
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}
public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();
    }

    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }

    addEntry(hash, key, value, index);
    return null;
}

SynchronizedMap 是如何实现线程安全的?

SynchronizedMap 并没有声明 synchronized 方法,但是还是利用了互斥的 mutex ,相对于 hashtable 没有真正意义上的改进。

代码语言:javascript
复制
 private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
        private static final long serialVersionUID = 1978198479659022715L;

        private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize

        SynchronizedMap(Map<K,V> m) {
            this.m = Objects.requireNonNull(m);
            mutex = this;
        }

        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }

        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }

        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }

        private transient Set<K> keySet;
        private transient Set<Map.Entry<K,V>> entrySet;
        private transient Collection<V> values;

        public Set<K> keySet() {
            synchronized (mutex) {
                if (keySet==null)
                    keySet = new SynchronizedSet<>(m.keySet(), mutex);
                return keySet;
            }
        }

        public Set<Map.Entry<K,V>> entrySet() {
            synchronized (mutex) {
                if (entrySet==null)
                    entrySet = new SynchronizedSet<>(m.entrySet(), mutex);
                return entrySet;
            }
        }
}

hashtable 或者同步包装版本,都只适合在非高度并发的场景下。

ConcurrentHashMap 是如何设计实现的?

ConcurrentHashMap 为什么能够大大提高并发效率?ConcurrentHashMap 的设计一直在演化,比如在 Java 8 中就发生发生了很大变化。

  • 分离锁,也就是将内部进行分段( Segment),里面则是 HashEntry的数组,和 HashMap类似,哈希相同的条目也是以链表形式存放。
  • HashEntry 内部使用 volatile的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe中的很多操作都是 VM intrinsic优化过的。

早期的 ConcurrentHashMap 实现

image

JDK 7 ConcurrentHashMap 是如何 get 的?

代码语言:javascript
复制
public v get(object key){
  Segment<k, V> s ;// manually integrate access methods to reduce overhead
  HashEntry<k, v>[] tab:
  int h= hash(key.hashcode());
  //利用操作替换普迅数字运算
  long u = (((h >> segmentshift)& segmentMask)<< SSHIFT)+ SBASE
  //以 segment为单位,进行定位
  //利用 insafe直接 Volatile access
  if((s =(Segment<k, V>)UNSAFE.getobjectvolatile(segments, u)) != null &&(tab = s.table) != null){

  }
  // 略
  return null;
}

JDK 7 ConcurrentHashMap 是如何 put 的?

代码语言:javascript
复制
public v put(K key, v value){
    segment<K, v> 5;
    if(value =s null)
      throw new NullPointerException()
    //二次哈希,以保证数分散性,避免哈希突
    int hash = hash(key.hashcode());
    int j =(hash >>>segmentshift)& segnentMask ;
    if((s =(Segment<k, V>) UNSAFE.getobject
    // nonvolatile:recheck
    (segments, (j < SSHIFT)+ SBASE))= null)// in ensuresegm     return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 无论如何,确保获取锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            // 更新已有的value
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                    // 放着hashEntry 到特定位置,如果超过阀值,进行 rehash
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

JDK 7 实现的ConcurrentHashMap 是居于分段锁技术实现的

  • ConcurrentHashMap会获取再入锁,以保证数据一致性, Segment本身就是基于 ReentrantLock的扩展实现,所以,在并发修改期间,相应 Segment是被锁定的
  • 在最初阶段,进行重复性的扫描,以确定相应key值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突是 ConcurrentHash Map的常见技巧我在专栏上一讲介绍 HashMap时,提到了可能发生的扩容问题,在 ConcurrentHashMap 中同样存在。不过有一个明显区别,就是它进行的不是整体的扩容,而是单独对 Segmen进行扩容,细节就不介绍了。
  • 另外一个Map的size方法同样需要关注,它的实现涉及分离锁的一个副作用。试想,如果不进行同步,简单的计算所有 Segment的总值,可能会因为并发put,导致结果不准确,但是直接锁定所有 Segment进行计算,就会变得非常昂贵。其实,分离锁也限制了Map的初始化等操作。

JDK 7 ConcurrentHashMap 是如何获得 size 的

代码语言:javascript
复制
public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                // 实现重试机制 ,试图获得可靠值,如果监控到发生变化,就直接返回,否则就获取锁进行操作,
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                // 监控是否有变化
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

ConcurrentHashMap的实现是通过重试机制( RETRIES_ BEFORE_LOCK,指定重试次数2),来试图获得可靠值。如果没有监控到发生变化(通过对比 Segment.modCount),就直接返回,否则获取锁进行操作。

Java 8 之后的版本 ConcurrentHash 发生了哪些变化?

  • 总体结构上,它的内部存储变得和我在专栏上一讲介绍的 HashMap结构非常相似,同样是大的桶( bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要细致一些,还是数组+链表结构。
  • 其内部仍然有 Segment定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
  • 因为不再使用 Segment,初始化操作大大简化,修改为lazy-oad形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点
  • 数据存储利用 volatile来保证可见性。
  • 使用 CAS 等操作,在特定场景进行无锁并发操作
  • 使用 Unsafe、 LongAdder之类底层手段,进行极端情况的优化。

JDK 8 数据存储内部实现

我们可以发现Key是final的,因为在生命周期中,一个条目的Key发生变化是不可能的;与此同时value,则声明为 volatile,以保证可见性。

代码语言:javascript
复制
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

JDK 8 ConcurrentHashMap 是怎么 get 的?

实现相对简单,先找到哪个节点,然后,在链表中遍历查找。

代码语言:javascript
复制
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

JDK 8 ConcurrentHashMap 是怎么 put 的 ?

代码语言:javascript
复制
public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
               // 利用CAS 去进行无锁线程安全操作 如果 bin 是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    // 细粒度的同步修改操作
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //  Bin 链表超过阀值,进行树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

可以看到在同步逻辑上,它使用的是 synchronized ,不是 JDK 7 的 ReentrantLock 之类,为什么?JDK 对 synchronized 进行了不断优化,不在需要过分担心性能差异,相对于 ReentrantLock,可以减少内存消耗,是个非常大的优势。

JDK 8 ConcurentHashMap 是怎么进行 inittable 的?

代码语言:javascript
复制
 private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

这里用的是一个典型的 CAS 使用场景,利用 volatile 的 sizectl 作为互斥手段,如果发现竞争性的初始化,那么spin, 等待条件恢复,否则利用 CAS 设置排他标准时,如果成功那么初始化,否则重试。while循环就是这个功效,

JDK 8 ConcurrentHashMap size() 方法

代码语言:javascript
复制
   public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
  final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
    
 static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
  }
    

计数思路还是一样的,都是分而治之进行计数,然后求和处理。可以看到 CounterCell 的操作 ,是基于 java.util.concurrent.atomic.LongAdder 进行的,是个比较高效的线程安全计数实现,大多数情况下,建议使用 ActomicLong。相对于 JDK 7 中的实现,没有重试机制, JDK 8 中 put 或者 clear 方法,remove 中有 addCount() 方法 + CounterCell 能得精确的size。

  • http://hg.openjdk.java.net/jdk/jdk/file/12fc7bf488ec/src/java.base/share/classes/java/util/concurrent/atomic/Striped64.java
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 如何保证容器是线程安全的?ConcurrentHashMap 如何高效的线程安全?
    • 如何保证线程安全
      • 为什么需要 ConcurrentHashMap
        • Hashtable 是怎样实现线程安全的。
          • SynchronizedMap 是如何实现线程安全的?
          • ConcurrentHashMap 是如何设计实现的?
          • JDK 7 ConcurrentHashMap 是如何 get 的?
          • JDK 7 ConcurrentHashMap 是如何 put 的?
          • JDK 7 ConcurrentHashMap 是如何获得 size 的
        • Java 8 之后的版本 ConcurrentHash 发生了哪些变化?
          • JDK 8 数据存储内部实现
            • JDK 8 ConcurrentHashMap 是怎么 get 的?
            • JDK 8 ConcurrentHashMap 是怎么 put 的 ?
            • JDK 8 ConcurentHashMap 是怎么进行 inittable 的?
            • JDK 8 ConcurrentHashMap size() 方法
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档