前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程进阶——JUC并发编程之Semaphore源码一探究竟?

多线程进阶——JUC并发编程之Semaphore源码一探究竟?

作者头像
须臾之余
发布2021-07-27 15:04:57
3340
发布2021-07-27 15:04:57
举报
文章被收录于专栏:须臾之余

1、学习切入点

百度翻译如下:

计数信号量。从概念上讲,信号量维护一组许可。如果需要,每个acquire()都会阻塞,直到有许可证可用,然后获取它。每个release()添加一个许可,可能释放一个阻塞的收单机构。但是,并没有使用实际的许可对象;信号量只是保持一个可用数量的计数,并相应地进行操作。

2、案例引入

代码语言:javascript
复制
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 线程数量:停车位! 限流!
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <=6 ; i++) {
            new Thread(()->{
                // acquire() 得到
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // release() 释放
                }
            },String.valueOf(i)).start();
        }
    }
}

3、入手构造函数

代码语言:javascript
复制
/** 创建具有给定许可数和非空公平设置的信号量 */  
 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
/** 创建具有给定许可数和给定公平性设置的信号量 */
 public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

上面两个构造函数,默认构造函数是非公平锁来实现,通过设置构造参数 fail 来选用公平策略还是非公平策略。

所谓公平和非公平的意思是:假设现在有一个线程A在等待获取锁,这时候又来了一个线程B,如果这个时候B 不考虑A的感受,也去申请锁,这显然是不公平的;反之,只要A 是先来的,B一定排在A 的后面,不能马上去申请锁,这就是公平策略。

4、入手核心方法

代码语言:javascript
复制
/** 尝试获取锁资源,获取到了则立即返回并跳出方法,没有获取到则该方法阻塞等待*/    
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //检测线程中断标志位
        if (Thread.interrupted())
            throw new InterruptedException();
        //尝试获取共享资源锁,小于0表示获取失败,此方法由AQS的具体子类来实现
        if (tryAcquireShared(arg) < 0)
            //将尝试获取锁资源的线程进行入队操作
            doAcquireSharedInterruptibly(arg);
    }
代码语言:javascript
复制
/** 1、公平策略实现方式*/        
protected int tryAcquireShared(int acquires) {
            for (;;) {//自旋操作
                if (hasQueuedPredecessors())//检测线程是否有阻塞队列,这里是和非公平策略唯一多个判断的地方
                    return -1;
                int available = getState();//获取锁资源的最新内存值
                int remaining = available - acquires;//计算得到剩下的许可数量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))//CAS原子操作共享资源
                    return remaining;//上层根据这个返回值进行判断是否需要入队操作
            }
        }
/** 2、非公平策略实现方式*/        
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {//自旋操作
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

4.1、【doAcquireSharedInterruptibly】获取共享资源失败进入这个方法

代码语言:javascript
复制
 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //创建共享模式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {//自旋死循环操作
                final Node p = node.predecessor();//获取节点的前驱节点
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//说明已经成功获取到共享资源了
                        setHeadAndPropagate(node, r);//把当前node节点设置为head节点
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//根据前驱节点看看是否需要休息下
                    parkAndCheckInterrupt())//阻塞操作,正常情况下获取不到共享锁,代码就这这个方法stop了,until 被唤醒!
                  //被唤醒后,发现parkAndCheckInterrupt()里面检测了被中断了的话,则补上中断异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个方法的逻辑与独占模式下的逻辑差不多,当所有的坑都被占用的时候,在来的线程都会被封装成共享模式节点,添加到等待队列里面去。而共享模式是实现多个坑同时提供服务的核心。

再来看看坑释放的过程,从【release】方法进去,核心在【tryReleaseShared】方法

4.2、【release】方法释放坑源码分析

代码语言:javascript
复制
    public void release() {
        sync.releaseShared(1);//释放一个许可资源
    }
    //父类AQS的一个释放共享资源基类方法
    //releaseShared主要是进行共享资源的释放,如果成功释放共享资源的话,则唤醒等待队列中的节点,如果失败则返回false,由上层调用方决定如何处理!
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//AQS子类实现,尝试释放共享资源
            doReleaseShared();//自旋操作,唤醒后继节点
            return true;
        }
        return false;
    }
代码语言:javascript
复制
/** 核心方法  NonfairSync 和 FairSync 的父类 Sync 类的 tryReleaseShared 方法  */        
protected final boolean tryReleaseShared(int releases) {
            for (;;) {//自旋
                int current = getState();
                int next = current + releases;//对许可数量进行加法操作
                //int类型值小于0,是因为该int类型的state状态值溢出了,溢出的话,说明这个锁很难释放,可能出问题了
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;//返回成功标志,告诉上层该线程已经释放了共享资源
            }
        }

如果获取共享资源成功的话,会进入【doReleaseShared】这个方法

4.3、【doReleaseShared】源码分析

代码语言:javascript
复制
/** 主要是释放共享许可证,但是最重要的是保证唤醒后继节点的传递,来让这些线程释放他们所持有的信号量 */
private void doReleaseShared() {
       
        for (;;) {//自旋
            Node h = head;//每次都是取队列头结点
            if (h != null && h != tail) {//若头结点不为空且也不是队尾节点
                int ws = h.waitStatus;//那么就获取头结点的状态值
                if (ws == Node.SIGNAL) {//若头结点是SIGNAL状态则后继节点需要被唤醒了
                    //通过CAS尝试设置头结点的状态为空状态,失败的话,则继续loop,因为并发有可能其它地方也在进行释放操作
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒头结点的后继节点
                } 
                //如果头结点为空状态,则把其改为PROPAGATE状态,失败的话则可能是并发修改,再循环CAS
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //若头结点没有发生变化,则说明上述设置以及完成,大功告成,功成身退
            //若发生变化,可能是操作过程中头结点有了新增或者啥的,那么必须重试,以保证唤醒动作可以延续传递
            if (h == head)                   // loop if head changed
                break;
        }
    }

5、回顾总结

最后我们梳理下Semaphore 的流程的一些特性:

1、管理一系列许可证,即state共享资源值; 2、获取许可证的时候acquire一次【int remaining = available - acquires】则state就减少一次,直到许可证数量小于0则阻塞等待; 3、释放许可证的时候release一次【int next = current + releases】对许可数量进行加法操作,要保证唤醒后继节点,以此来保证线程释放它们锁持有的信号量; 4、是Synchronized的升级版,因为Synchronized是只有一个许可证,而semaphore就像开挂一样,可以有多个许可证。

推荐阅读:

多线程进阶——JUC并发编程之抽象同步队列AQS框架设计理念一探究竟?

多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟

多线程进阶——JUC并发编程之Semaphore源码一探究竟

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、学习切入点
  • 2、案例引入
  • 3、入手构造函数
  • 4、入手核心方法
    • 4.1、【doAcquireSharedInterruptibly】获取共享资源失败进入这个方法
      • 4.2、【release】方法释放坑源码分析
        • 4.3、【doReleaseShared】源码分析
        • 5、回顾总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档