首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

JUC并发编程之Semaphore信号量、CountDownLatch、CyclicBarrier栅栏、Executo……

Semaphore信号量:

这个东西很简单,别看字面意思,什么信号量,我也不懂得那个术语什么意思,Semaphore你可以这样来理解,我们要去看电影,而且是3D电影(必须戴3D眼镜才可以进入),但是比较不巧的是我们电影院只有两个3D眼镜了,也就是说,我们每次只能进去两个人看电影,然后等待这两个人看完电影以后把眼镜还回来,后面的两个人才能继续观看,就是说每次只允许最多进去两个人,每次进入到线程获取锁,需要你得到前置的票据,才可以进行后续的流程。可以理解为一个简单的限流吧。我们来一下代码示例。

publicclassTest{

publicstaticvoidmain(String[] args)throwsInterruptedException {

Semaphore semaphore =newSemaphore(2);

for(inti =; i

newThread(newTask(semaphore,"xiaocaijishu"+i)).start();

}

}

staticclassTaskextendsThread{

Semaphore semaphore;

publicTask(Semaphore semaphore,String tname){

this.semaphore = semaphore;

this.setName(tname);

}

publicvoidrun() {

try{

semaphore.acquire();

System.out.println(Thread.currentThread().getName()+"拿着3D眼镜进去了,时间是"+System.currentTimeMillis());

Thread.sleep(1000);

semaphore.release();

System.out.println(Thread.currentThread().getName()+"出来了,将3D眼镜还给了服务人员,时间是"+System.currentTimeMillis());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

运行结果就是这样的

我们来解释一下运行结果,线程1和线程3同一时间去看电影了,然后1出来了,这时线程9马上拿着我们的3D眼镜进去了,过了一会线程3也看完电影了,出来了还了3D眼镜,线程7又在同一时间拿着3D眼镜进去看电影了,后续线程都是如此执行的,每次只是进入两个线程。

简单的使用看到了,我们来看看底层的源码设计吧。开始的时候我们是创建一个Semaphore内部票据数目给予的是2。

//1.创建初始票据是2的Semaphore

Semaphore semaphore =newSemaphore(2);

//2.进入Semaphore,查看数据2是如何存储的.

publicSemaphore(intpermits) {

sync =newNonfairSync(permits);

}

//3.底层还是基于sync 创建了一个对象,但不同于过去ReetrantLock的是,这次是一个非公平的锁对象,我们再次进入NonfairSync看看那个数字2到底放在哪里了.

Sync(intpermits) {

setState(permits);

}

//4.我们可以看到底层还是用State来存储的.

这次没有把所有代码全部粘出来,感觉那样像是凑篇幅一样。

通过上述代码,我们可以看到,我们的初始票据数,是上一次那个state来存的。

后续我们调用了acquire方法来尝试获取票据,acquire方法也可以传入获取票据数目的比如semaphore.acquire(2);也是可以的。我们进入acquire方法来看看到底是如何获取的。

//从new Semaphore(2);点击进入后续方法

publicvoidacquire()throwsInterruptedException {

sync.acquireSharedInterruptibly(1);

}

//我们可以看到,当我们没有传需要获取多少票据的时候,会默认给予1这个参数,我们来继续看后续流程

publicfinalvoidacquireSharedInterruptibly(intarg)

throwsInterruptedException {

if(Thread.interrupted())

thrownewInterruptedException();

if(tryAcquireShared(arg)

doAcquireSharedInterruptibly(arg);

}

//Thread.interrupted()判断当前线程是否已经中断,如果中断我直接抛出异常,电影都演完了,我拿3D眼镜还有毛线用.

//tryAcquireShared(arg)尝试获取票据,arg是1,刚才给予的默认1

finalintnonfairTryAcquireShared(intacquires) {

for(;;) {

intavailable = getState();

intremaining = available - acquires;

if(remaining

compareAndSetState(available, remaining))

returnremaining;

}

}

//内部有实现关系,所以调用的是Semaphore类nonfairTryAcquireShared方法,我们来解读一下

//直接就是一个死循环, int available = getState();获取一下当前还有多少票据

// int remaining = available - acquires;计算出当前票据减去所需票据的一个剩余值

//if (remaining < 0 || compareAndSetState(available, remaining))我们现有2个票据,拿走1个,剩余1个,所以remaining < 0 一定是false的

//再来看另一半compareAndSetState,用原子计算(上次博客说过为什么要原子计算)方式来修改剩余票据,这个是可以修改成功的.所以满足条件可以返回一个2-1 也就是返回一个正数1

是不是有点看懵圈了,很多小伙伴感觉if (remaining < 0 ||compareAndSetState(available, remaining))前面的remaining

有没有感觉好点了,自己可以跟着源代码走一走,获取的过程就差一个doAcquireSharedInterruptibly还没有看了,如果获取超过了票据数,也就是不应该让返回负数时运行doAcquireSharedInterruptibly方法,我们来看一下。

privatevoiddoAcquireSharedInterruptibly(intarg)

throwsInterruptedException {

finalNode node = addWaiter(Node.SHARED);//以共享方式添加节点

booleanfailed =true;

try{

for(;;) {

finalNode p = node.predecessor();//判断前驱节点是否为空

if(p == head) {

intr = tryAcquireShared(arg);//再次尝试获取票据

if(r >=) {//>= 0表示获取票据成功

setHeadAndPropagate(node, r);//更改头节点

p.next =null;// help GC

failed =false;

return;

}

}

if(shouldParkAfterFailedAcquire(p, node) &&//剔除不可用的Node节点

parkAndCheckInterrupt())//阻塞当前线程

thrownewInterruptedException();

}

}finally{

if(failed)

cancelAcquire(node);

}

}

经过两次以上的尝试,我们将该线程阻塞了,不至于一直for循环在运行,也就这样,票据发放完毕了。

过程差不多就是这样的,我们可以再仔细看一下是如何添加节点的,上次ReetrantLock说了一些,我们这次再来看一下。我们现已第一次塞节点为例,

privateNode addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if(pred !=null) {//第一次一定是空的,我们现在已初始塞节点为例。

node.prev = pred;

if(compareAndSetTail(pred, node)) {

pred.next = node;

returnnode;

}

}

enq(node);//为空直接进入这个逻辑

returnnode;

}

privateNodeenq(finalNode node) {

for(;;) {

Node t = tail;

if(t ==null) {// Must initialize //1.第一次一定是空 //二次循环不为空 进入else

if(compareAndSetHead(newNode()))//2.创建一个空节点,并且作为head节点.

tail = head;//3.tail指向那个head节点

}else{

node.prev = t;//4. 将node节点的前驱指针指向

if(compareAndSetTail(t, node)) {//5.原子计算方式将node节点后驱节点指向tail

t.next = node;//6.将t节点(空节点)的后驱指针指向node节点

returnt;

}

}

}

}

第一次循环只是一个内部的初始空节点,第二次循环才是移动指针塞入的过程。

节点唤醒是在释放票据时被唤醒的,代码超级简单,可以自己当做一份作业,自己去看一遍代码吧~!提示流程就是先还票据,然后唤醒。Semaphore差不多就这些知识点,我也带着大家简单的看了一遍源码。我们再来继续看一下后面AQS的一些工具类。

CountDownLaunch的基本使用

CountDownLaunch很好理解,也是比较实用的,我们干王者农药的时候就是一个很好的栗子,游戏选完人物大家一起加载地图等游戏资料,有的人慢,有的人快,这时就印出来了CountDownLaunch,相当于我们5个玩家同时开启5个线程,然后一起执行,执行完毕先等着,直到5个玩家全部执行完成时,才可以运行后续操作。我们来看一下代码。

publicclassCountDownLaunchSample{

publicstaticvoidmain(String[] args) throws InterruptedException {

CountDownLatch countDownLatch =newCountDownLatch(2);

newThread(newplayerOne(countDownLatch)).start();

newThread(newplayerTwo(countDownLatch)).start();

countDownLatch.await();

System.out.println("全部加载完成");

}

staticclassplayerOneimplementsRunnable{

CountDownLatch countDownLatch;

publicplayerOne(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

}

publicvoidrun() {

try{

System.out.println("玩家1开始加载...");

Thread.sleep(2000);

System.out.println("玩家1加载完成");

}catch(InterruptedException e) {

e.printStackTrace();

}finally{

if(countDownLatch !=null)

countDownLatch.countDown();

}

}

}

staticclassplayerTwoimplementsRunnable{

CountDownLatch countDownLatch;

publicplayerTwo(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

}

publicvoidrun() {

try{

System.out.println("玩家2开始加载...");

Thread.sleep(10000);

System.out.println("玩家2加载完成");

}catch(InterruptedException e) {

e.printStackTrace();

}finally{

if(countDownLatch !=null)

countDownLatch.countDown();

}

}

}

}

实际项目中如果遇到读取excel多个sheet页签然后汇总数据的情况也可以采用CountDownLanch。注意最后final的countDownLatch.countDown()方法,也是一个类似上面票据增减的方法。

CyclicBarrier栅栏的简单使用:

CyclicBarrier和我们上面的CountDownLanch差不多,都是开启多个任务一起去执行,不同的是CountDownLanch需要支线任务执行完成然后CountDownLanch做一个汇总,然后继续运行后续程序。CyclicBarrier不需要做汇总。再就是CyclicBarrier是可以重复的。

publicclassCyclicBarrierTestimplementsRunnable{

privateCyclicBarrier cyclicBarrier;

privateintindex ;

publicCyclicBarrierTest(CyclicBarrier cyclicBarrier,intindex) {

this.cyclicBarrier = cyclicBarrier;

this.index = index;

}

publicvoidrun() {

try{

System.out.println("index: "+ index);

index--;

cyclicBarrier.await();

}catch(Exception e) {

e.printStackTrace();

}

}

publicstaticvoidmain(String[] args) throws Exception {

CyclicBarrier cyclicBarrier =newCyclicBarrier(11,newRunnable() {

publicvoidrun() {

System.out.println("所有特工到达屏障,准备开始执行秘密任务");

}

});

for(inti =; i

newThread(newCyclicBarrierTest(cyclicBarrier, i)).start();

}

cyclicBarrier.await();

System.out.println("全部到达屏障....");

}

}

这个需要注意的是CyclicBarrier cyclicBarrier = new CyclicBarrier(11, 这个11,就是说一定有11个线程执行完毕,我才可以执行后面的操作,我们下面for循环是10,而我们那里写的是11啊,别忘记还有一个主线程呢,所以说每次计算一定加一个主线程啊。

Exchanger的简单使用

最后就是我们Exchanger,平时使用的不多,我们了解一下就可以了,搂一眼代码,就是线程之间的变量交换。

publicstaticvoidmain(String[]args) {

finalExchanger exchanger =newExchanger();

for(inti =; i

finalIntegernum= i;

newThread() {

publicvoidrun() {

System.out.println("我是线程:Thread_"+this.getName() +"我的数据是:"+num);

try{

Integer exchangeNum = exchanger.exchange(num);

Thread.sleep(1000);

System.out.println("我是线程:Thread_"+this.getName() +"我原先的数据为:"+num+" , 交换后的数据为:"+ exchangeNum);

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}.start();

}

}

总结:

这次我们核心梳理了我们的Semaphore的执行流程,内部是如何来实现我们的票据计数,获取,归还等操作的,再就是我们for无限循环会在两次以后自动阻塞的设计思想,还有我们的CountDown Lanch、CyclicBarrier、Executors的基本使用,并赋予大家简单的代码流程,今天就说到这,明天我们继续来说我们的多线程。

- END -

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20201023A08F2200?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券