上文:AQS-semaphore&CyclicBarrier&CountDownLatch源码学习
源码下载:https://gitee.com/hong99/jdk8
Exchanger是什么?
exchanger是一个极少使用到的交换类,主要用于线程阻塞或者因为阻塞引起但任务又急于执行,这里候就可以进行交换。但是有一个非常的复杂点就是两个并发任务执行过程中交换数据,这一点是非常厉害的,可以看下下面的一些基础实现。
基础功能的学习
package com.aqs;
import java.util.concurrent.Exchanger;
/**
* @author: csh
* @Date: 2022/12/17 10:43
* @Description:Exchanger 学习
*/
public class ExchangerStudy {
public static void main(String[] args) {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
for (int i = 0; i < 10; i++) {
final int index=i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"下标为:"+index);
try {
int newIndex = exchanger.exchange(index);
//等待100毫秒
Thread.sleep(100);
System.out.println("当前线程:Thread_"+Thread.currentThread().getName()+"原来下标为:"+index+"交换后下标为:"+newIndex);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
结果
当前线程:Thread_Thread-2下标为:2
当前线程:Thread_Thread-1下标为:1
当前线程:Thread_Thread-4下标为:4
当前线程:Thread_Thread-3下标为:3
当前线程:Thread_Thread-5下标为:5
当前线程:Thread_Thread-6下标为:6
当前线程:Thread_Thread-0下标为:0
当前线程:Thread_Thread-7下标为:7
当前线程:Thread_Thread-8下标为:8
当前线程:Thread_Thread-9下标为:9
当前线程:Thread_Thread-4原来下标为:4交换后下标为:3
当前线程:Thread_Thread-6原来下标为:6交换后下标为:5
当前线程:Thread_Thread-0原来下标为:0交换后下标为:7
当前线程:Thread_Thread-9原来下标为:9交换后下标为:8
当前线程:Thread_Thread-8原来下标为:8交换后下标为:9
当前线程:Thread_Thread-7原来下标为:7交换后下标为:0
当前线程:Thread_Thread-5原来下标为:5交换后下标为:6
当前线程:Thread_Thread-2原来下标为:2交换后下标为:1
当前线程:Thread_Thread-1原来下标为:1交换后下标为:2
当前线程:Thread_Thread-3原来下标为:3交换后下标为:4
可以看到两个线程可以交换执行的下标。是比较厉害。接下来我们看看底层的源码实现。
源码学习
java.util.concurrent.Exchanger 源码实现时
//交换机
public class Exchanger<V> {
//避免伪共享 左移数据下标(获取内存偏移量)
private static final int ASHIFT = 7;
//节点最大的数组下标
private static final int MMASK = 0xff;
//用于递增,每次加一个seq
private static final int SEQ = MMASK + 1;
//获取cpu的核数
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//实际组数长度(线程数)
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
//自旋次数 当cpu为单核时,该参为禁用
private static final int SPINS = 1 << 10;
//用于提供给其他线程的交换对象
private static final Object NULL_ITEM = new Object();
//用于超时的传递对象
private static final Object TIMED_OUT = new Object();
//交换节点 保存交换的数据
@sun.misc.Contended static final class Node {
int index; // 多槽数据索引
int bound; // 上一次的边界
int collides; // 记录边界范围内cas失败次数
int hash; // 代表hash值 用于自旋优化
Object item; // 节点带的数据
volatile Object match; // 未来 配对成功 交换的数据
volatile Thread parked; // 匹配的线程
}
//本地线程类实现初始化值
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
// 存放node节点 保障线程安全
private final Participant participant;
//多槽数组
private volatile Node[] arena;
/**
* 交换的槽位
*/
private volatile Node slot;
//上次记录
private volatile int bound;
//多槽位的交换实现(带过期时间)
private final Object arenaExchange(Object item, boolean timed, long ns) {
//获取多槽位数组
Node[] a = arena;
//获取当前线程node对象
Node p = participant.get();
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // 初始化相关的交换变量
//获取交换节点信息(先获取偏移地址)
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
//如果不为空(证明已经有线程) 进行交换
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
//获取交换q的内容
Object v = q.item; // release
//将当前线程的内容赋值给q的match
q.match = item;
//获取被交换线程
Thread w = q.parked;
//不为空进行唤醒
if (w != null)
U.unpark(w);
//这个是交换后的值
return v;
}
//槽位还没被占的场景
else if (i <= (m = (b = bound) & MMASK) && q == null) {
//交换对象值
p.item = item; // offer
//cas交换
if (U.compareAndSwapObject(a, j, null, p)) {
//计算超时时间
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
for (int h = p.hash, spins = SPINS;;) {
//用来携带交换线程的数
Object v = p.match;
//已被交换 则清标识
if (v != null) {
U.putOrderedObject(p, MATCH, null);
p.item = null; // clear for next use
p.hash = h;
//交换成功
return v;
}
//判断自旋是否大于0
else if (spins > 0) {
//太复杂了这里 反正就是各各自旋次数获取 然后释放线程资源
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
//已有交换线程 准备数据中
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
// //线程不挂起 不是 多槽 时间没结束
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
//注意这里 park则挂起线程
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
//换槽位(原因可能一直没有线程) 逻辑有点复杂
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
}
}
}
else
//获取槽位失败,先清空数据
p.item = null; // clear offer
}
//不在有效范围内,或者已经被其它线程抢了~
else {
//更新bound
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
//往后挪动
i = m + 1; // grow
//更新下标
p.index = i;
}
}
}
//单槽位交换方法
private final Object slotExchange(Object item, boolean timed, long ns) {
//获取当前线程携带的数据
Node p = participant.get();
//获取当前线程
Thread t = Thread.currentThread();
//判断中断不为中断
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
//自旋
for (Node q;;) {
//赋给q
if ((q = slot) != null) {
//cas交换对象并将slot置为null
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
//将当前的对象赋给交换对象的match
q.match = item;
//取出交换的线程
Thread w = q.parked;
//唤醒线程
if (w != null)
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
//多槽交换不为空则退出
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
//把数据赋给当前node节点
p.item = item;
//进行交换数据 成功则退出
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
//如果交换不成功则赋为空
p.item = null;
}
}
// await release
//拿到节点hash
int h = p.hash;
//计算时间
long end = timed ? System.nanoTime() + ns : 0L;
//获取自旋次数
int spins = (NCPU > 1) ? SPINS : 1;
//返回值
Object v;
//判断不为空
while ((v = p.match) == null) {
//自旋次数
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
//让出线程权限
Thread.yield();
}
//优化操作 自旋线程准备中
else if (slot != p)
spins = SPINS;
//线程不挂起 不是 多槽 时间没结束
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
//
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
//
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
//返回这个v是线程交换线程的。
return v;
}
//构造方法
public Exchanger() {
participant = new Participant();
}
//交换的数据
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
//当前线程用于交换的数据
Object v;
//获取item值
Object item = (x == null) ? NULL_ITEM : x; // translate null args
//不是多槽
if ((arena != null ||
//slotExchange 单槽位交换实现的方法
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
//
@SuppressWarnings("unchecked")
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Object v;
Object item = (x == null) ? NULL_ITEM : x;
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
((Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null)))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
}
//字段偏移量信息
private static final sun.misc.Unsafe U;
private static final long BOUND;
private static final long SLOT;
private static final long MATCH;
private static final long BLOCKER;
private static final int ABASE;
//初始化信息
static {
int s;
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> ek = Exchanger.class;
Class<?> nk = Node.class;
Class<?> ak = Node[].class;
Class<?> tk = Thread.class;
BOUND = U.objectFieldOffset
(ek.getDeclaredField("bound"));
SLOT = U.objectFieldOffset
(ek.getDeclaredField("slot"));
MATCH = U.objectFieldOffset
(nk.getDeclaredField("match"));
BLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
s = U.arrayIndexScale(ak);
// ABASE absorbs padding in front of element 0
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0 || s > (1 << ASHIFT))
throw new Error("Unsupported array scale");
}
}
最后
exchanger默认也是使用了CAS+park/unpark进行实现的,我这里是基于jdk8,jdk7与jdk8是有区别的。本身解决的问题是通过两个线程进行交换执行值,没想到这个exchanger代码不多但是非常复杂,有些可能写得不好,但是有想深入同学可以看看下面两个文章。
参考资料:
https://www.bilibili.com/video/BV17P4y177SD/?spm_id_from=333.788&vd_source=7d0e42b081e08cb3cefaea55cc1fa8b7
https://blog.csdn.net/weixin_30612769/article/details/97769773