Exchanger
简介
有时我们须要对元素进行配对和交换线程的同步点,使用 exchange 方法返回其伙伴的对象,这时我们就须要使用线程类中的 Exchanger 类了,
简而言之,可以在不同线程间交换数据。
使用入门
废话少说,直接上代码。
定义执行类
我们定义一个 Runnable 类,会将传入的 data 已经交换,并打印获取到的数据。
测试
我们使用线程池执行数据交换测试,日志如下:
可以看到两个线程的数据已经发生了交换。
这么神奇,到底是如何实现的呢?
感兴趣的小伙伴可以一起来阅读以下源码。
源码解析
类定义
这个类是在 jdk1.5 引入的。
算法笔记
ps: 这里是作者的算法笔记,不出现在 doc 中,主要是便于大家理解。内容较多,可以跳过。阅读完源码后,结合起来看。
概述:对于交换“槽(slot)”,核心算法是一个参与者和一个项目(呼叫者):
这是“双重数据结构”(dual data structure)的最简单形式之一-参见Scott和Scherer的DISC 04论文和
http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html
原则上,这很好。
但是实际上,就像许多集中于单个位置的原子更新的算法一样,当使用同一个Exchanger的参与者多于几个时,它会可怕地扩展。
因此,该实现改为使用消除域的形式,该域通过安排某些线程通常使用不同的插槽来扩展此争用,同时仍确保最终任何两个参与方都能够交换项目。
也就是说,我们不能完全在线程之间进行分区,而是给线程提供竞技场索引,这些索引在争用情况下平均会增长,在缺乏争用的情况下会缩小。
我们通过将我们仍然需要的节点定义为ThreadLocals来实现这一点,并在其中包括每个线程的索引和相关的簿记状态。
(我们可以安全地重用每个线程的节点,而不必每次都重新创建它们,因为插槽在指向节点与空节点之间交替出现,因此不会遇到ABA问题。但是,在每次使用之间重置它们时,我们确实需要谨慎。)
实施有效的竞技场需要分配一堆空间,因此我们仅在检测到争用时这样做(单处理器除外,在单处理器上它们将无济于事,因此不会使用)。
否则,交换使用单槽slotExchange方法。
在争用时,插槽不仅必须位于不同的位置,而且由于位于同一高速缓存行(或更常见的是,相同的一致性单元),这些位置也不能遇到内存争用。
因为在撰写本文时,尚无法确定高速缓存行的大小,所以我们定义了一个足以满足通用平台的值。
此外,在其他地方也要格外小心,以避免其他错误/意外共享并增强位置,包括向节点添加填充(通过sun.misc.Contended),将“ bound”作为Exchanger字段嵌入,以及重新处理比较的某些 park/unpark 机制 到LockSupport版本。
竞技场(arena)仅以一个已使用的插槽开始。
我们通过跟踪碰撞来扩大有效竞技场的规模;即尝试交换时失败了。
根据上述算法的性质,唯一能够可靠地表明竞争的冲突是两种尝试的释放发生冲突时-两种尝试的提议中的一种可以合法地导致CAS失败,而没有其他多个线程指示争用。
(注意:有可能但不值得通过在CAS故障后读取插槽值来更精确地检测竞争。)
当线程在当前竞技场边界内的每个插槽处发生冲突时,它将尝试将竞技场大小扩大一倍。
我们通过在“bound”字段上使用版本(序列)编号来跟踪边界内的冲突,并在参与者注意到边界已更新(沿任一方向)时保守地重置冲突计数。
通过在一段时间后放弃等待并在到期时尝试减小竞技场的大小,可以减小有效竞技场的大小(当有多个插槽时)。
“一会儿”的值是一个经验问题。
我们通过附带使用 来实现,这对于获得合理的等待性能是必不可少的-在繁忙的交换器中,offers 通常几乎立即发布,在这种情况下,在多处理器上进行上下文切换非常缓慢/浪费。
Arena 等待,只是省略了阻塞部分,用于替代取消了。
根据经验,将自旋计数选择为一个值,该值可避免在一系列测试机上以最大持续汇率兑换99%的时间。
自旋和产量需要一定程度的随机性(使用廉价的xorshift),以避免可能导致无效的生长/收缩周期的规则模式。
(使用伪随机还可以通过使分支不可预测来帮助调整旋转周期的持续时间。)
另外,在要约期间,服务员可以“知道”在插槽更改后将被释放,但是直到设置了匹配项之后才能进行。
同时,它不能取消要约(cancel the offer),用来替代 spins/yields。
注意:可以通过将线性化点更改为match字段的CAS(如Scott&Scherer DISC论文中的一种情况)来避免这种二次检查,这也会增加异步性,但代价是更差冲突检测以及无法始终重用每个线程节点。
因此,当前方案通常是更好的折衷方案。
发生碰撞时,索引会以相反的顺序循环遍历竞技场,并在范围更改时以最大索引(趋向于最稀疏)重新开始。
(在到期时,索引减半直到达到0。)
可以(并已尝试)使用随机,素值步进或双哈希样式遍历,而不是简单的循环遍历,以减少聚集。
但是从经验上讲,这些好处可能无法克服其增加的开销:
除非存在持续的争用,否则我们将对发生的操作进行快速管理,因此,较简单/较快的控制策略比较准确但较慢的控制策略更有效。
因为我们将到期时间用于竞技场大小控制,所以在竞技场大小缩小到零(或者未启用竞技场)之前,我们无法在定时版本的公共交换方法中引发TimeoutExceptions。
这可能会延迟对超时的响应,但仍在规范范围内。
本质上,所有实现都在slotExchange和arenaExchange方法中。
它们具有相似的总体结构,但是在太多细节上无法组合。
slotExchange方法使用单个Exchanger字段“slot”,而不是竞技场数组元素。
但是,它仍然需要最少的碰撞检测来触发竞技场的建设。
(最混乱的部分是确保在两种方法都可能被调用时在过渡期间正确显示中断状态和InterruptedExceptions。这是通过将null返回作为哨兵来重新检查中断状态来完成的。)
在这种代码中太普遍了,方法是单块的,因为大多数逻辑依赖于字段的读取,这些字段作为局部变量维护,因此无法很好地进行分解-主要是在这里,笨重的 代码),并且在很大程度上依赖于内在函数(不安全)来使用内联嵌入式CAS和相关的内存访问操作(当动态编译器隐藏在其他方法后面时,动态编译器通常不会内联它们,因为它们可以更好地命名和封装该方法)预期的效果)。
这包括使用putOrderedX来清除两次使用之间每个线程节点的字段。
请注意,即使通过释放线程读取Node.item字段,也不会将其声明为volatile,因为它们仅在必须进行访问的CAS操作之后才声明为volatile,并且其他线程可以接受地接受拥有线程的所有使用。
(由于原子性的实际点是插槽CASes,所以在发行版中对Node.match的写入要弱于完全易失性写入,这也是合法的。但是,之所以不这样做,是因为它可能允许进一步推迟写入,延迟进度。)
平平无奇的内部变量
我们一起来看几个平平无奇的内部变量:
这几个变量在 LinkedTransferQueue 也有类似的。
Node 节点相关
@sun.misc.Contended
这个注解是干什么的?
这个主要是用来避免伪共享的。
这里先简单的解释一下。
伪共享
伪共享,高速缓存与内存之间是以缓存行为单位交换数据的,根据局部性原理,相邻地址空间的数据会被加载到高速缓存的同一个数据块上(缓存行),而数组是连续的(逻辑,涉及到虚拟内存)内存地址空间,因此,多个slot会被加载到同一个缓存行上,当一个slot改变时,会导致这个slot所在的缓存行上所有的数据(包括其他的slot)无效,需要从内存重新加载,影响性能。
所以,为了避免这种情况,需要填充数据,使得有效的slot不被加载到同一个缓存行上。
填充的大小即为1
填充collides CAS 操作
collides,当前bound下CAS失败的次数,最大为m,m(bound & MMASK)为当前bound下最大有效索引,从右往左遍历,等到collides == m时,有效索引的槽位也已经遍历完了,这时需要增长槽位,增长的方式是重置bound(依赖SEQ更新其版本,高位;+1,低位),同时collides重置
collides几个重要的变量
你看这个 bound 平平无奇,实际上还是有写东西需要大家理解一下。
bound,记录最大有效的arena索引,动态变化,竞争激烈时(槽位全满)增加, 槽位空旷时减小。
bound + SEQ +/- 1,其高位+ 1(SEQ,oxff + 1)确定其版本唯一性(比如,+1后,又-1,实际上是两个版本的bound,collides要重置的,而且从右向左遍历的索引也要更新,一般来讲,左边槽位比右边槽位竞争激烈,所以要从右向左找,为的是快速找到一个空位置,并尝试占领它,当bound加一又减一后,遍历索引右侧的槽位应该就空出来了,因为大家都往左边靠拢,所以要更新到最右侧,如果没有bound的版本唯一性,便没有索引更新,就一直往左遍历竞争激烈的槽位,还会误判,本来bound应该缩减的,反而又使其增加,于是会很影响效率的。),低位+/-1实际有效的索引(&MMASK)
对应的 CAS 操作如下图:
bound构造器
我们想使用 Exchanger 类,肯定要创建他。
如何创建呢?
只有一个无参构造器。
Participant 是什么?
希望你还记得前面的内容,这个就是一个 ThreadLocal 的子类简单实现。
核心方法
Exchanger 类最核心的方法其实只有一个:
当然也有可以指定超时时间的方法:
这两个方法本身并不难理解。
因为所有的复杂度都被封装在了 slotExchange 和 arenaExchange 这两个方法中,也是本文的重点。
Unsafe 机制
这些知道是通过 Unsafe 机制操作的即可,后面会用到。
slotExchange 方法
最核心的只有 2 个方法,个人的理解就是 slot 进行交换,然后等待 arean 唤醒。
流程梳理
为了便于大家理解,我们把流程梳理一遍:
检查slot是否为空(null),不为空,说明已经有线程在此等待,尝试占领该槽位,如果占领成功,与等待线程交换数据,并唤醒等待线程,交易结束,返回。
如果占领槽位失败,创建arena,但要继续【步骤1】尝试抢占slot,直至slot为空,或者抢占成功,交易结束返回。
如果slot为空,则判断arena是否为空,如果arena不为空,返回null,重新路由到arenaExchange方法
如果arena为空,说明当前线程是先到达的,尝试占有slot,如果成功,将slot标记为自己占用,跳出循环,继续【步骤5】,如果失败,则继续【步骤1】
当前线程等待被释放,等待的顺序是先自旋(spin),不成功则让出CPU时间片(yield),最后还不行就阻塞(block),spin -> yield -> block
如果超时(设置超时的话)或被中断,则退出循环。
最后,重置数据,下次重用,返回结果,结束。
arenaExchange 方法
流程梳理
为了便于大家理解,流程梳理如下:
从场地中选出偏移地址为(i
检查索引(i vs m)是否越界,越界,进入【步骤9】;没有越界,进入下一步。
尝试占有该槽位,抢占失败,进入【步骤1】;抢占成功,进入下一步。
检查match,是否有线程来交换数据,如果有,交换数据,结束;如果没有,进入下一步。
检查spin是否大于0,如果不大于0,进入下一步;如果大于0,检查hash是否小于0,并且spin减半或为0,如果不是,进入【步骤4】;如果是,让出CPU时间,过一会儿,进入【步骤4】
检查是否中断,m达到最小值,是否超时,如果没有中断,没有超时,并且m达到最小值,阻塞,过一会儿进入【步骤4】;否则,下一步。
没有线程来交换数据,尝试丢弃原有的槽位重新开始,丢弃失败,进入【步骤4】;否则,下一步。
bound减1(m>0),索引减半;检查是否中断或超时,如果没有,进入【步骤1】;否则,返回,结束。
检查bound是否发生变化,如果变化了,重置collides,索引重置为m或左移,转向【步骤1】;否则,进入下一步。
检查collides是否达到最大值,如果没有,进入【步骤13】,否则下一步。
m是否达到FULL,是,进入【步骤13】;否则,下一步。
CAS bound加1是否成功,如果成功,i置为m+1,槽位增长,进入【步骤1】;否则,下一步。
collides加1,索引左移,进入【步骤1】
流程图如下:
流程图小结
希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。
各位极客的点赞收藏转发,是老马持续写作的最大动力!
领取专属 10元无门槛券
私享最新 技术干货