今天学习一个比较特别的队列SynchronousQueue,直译过来叫同步队列,可用于线程间交换数据却不用存储数据。
功能说明
要理解SynchronousQueue首先就是要知道它的作用,只有知道这个类存在的目的和提供的功能,再能再读源码的时候不迷路。
我们在之前的学习中,如果线程间要交换数据一般都是用一个通过公共变量或者一个同步阻塞队列,生产者线程设置变量或者往队列中put值,消费者线程则读取变量或者从队列中take。
而SynchronousQueue则不需要存储线程间交换的数据,它的作用更像是一个匹配器,使生产者和消费者一一匹配。
一般流程
比如当一个线程调用了put方法时,发现队列中没有take线程,那么put线程就会阻塞,当take线程进来时发现有阻塞的put线程,那么他们两个就会匹配上,然后take线程获取到put线程的数据,两个线程都不阻塞。
反之一个线程调用take方法也会阻塞线程,当一个调用put方法的线程进来后也会与之匹配。
如果一个take或者put线程进来发现有同类的take或者put线程在阻塞中,那么线程会排到后面,直到有不同类的线程进来然后匹配其中一个线程。
通过流程描述相信对SynchronousQueue有了一定的了解,也知道SynchronousQueue为什么不用存储元素。
源码分析
通过查看SynchronousQueue的take与put方法发现都是调用的一个属性transferer的transfer方法,而transferer属性是SynchronousQueue的抽象静态内部类Transferer。Transferer有两个子类TransferQueue和TransferStack;
在SynchronousQueue构造方法中通过传递的参数fair来判断是创建TransferQueue还是TransferStack,通过参数fair来看TransferQueue应该是公平模式,那么TransferStack就是非功能模式。
公平模式TransferQueue实现
首先它有一个内部类QNode,通过上面分析发现可能会出现多个消费者或者多个生产者,他们就会形成一个队列,而QNode就是用来组成一个队列的链表。
QNode主要有四个属性:
QNode next:表示下一个节点;
Object item;这里实际上是put出去的数据,take方法生成的节点这里为null;
Thread waiter;阻塞的线程,一般是生成这个节点的线程阻塞,其他线程进来获取到了数据后会唤醒;
boolean isData:true则是put生成的,false表示是take生成的;
既然take与put都依赖transfer方法,那么我们就来看transfer的实现,源码太长就不贴出来了,直接看总结的流程图:
因为这个方法并没有采用锁来控制,所以在整个流程中还有很多判断,这些都是次要的,这里整理的是主要关键的流程。
简单说明一下,整个流程就是依赖QNode链表,QNode的isData来区分是take还是put方法,链表中的节点的isData一定是相同的,QNode的item是take和put线程交换的数据,只不过take方法交换的数据是null。
可以看出来每次可以匹配的时候都是拿的最前面的节点进来返回数据。
非公平模式TransferStack
同样TransferStack也有一个链表结构叫做SNode ,SNode 的主要属性如下:
SNode next: 下一个节点;
SNode match:与之匹配成功的节点;
Thread waiter:阻塞的线程;
Object item:要给出去的值;
int mode:节点分类,用于区分put还是take;
同样整理了transfer的源码流程图,具体如下图:
同样都是用链表实现,但是这个首先结构不一样,多一个match表示与之匹配的节点。
通过流程可以看到mode一个有三个值,除了0,1外还有一个2用来表示正在匹配中的节点。主要就是在上图中红框部分,在发现头部节点不是进行中时,当前线程就会创建一个匹配中的节点,然后加到头部,最后去和后面的节点匹配。
如果发现正在匹配中则会把匹配的节点从链表中移除。
通过流程分析可以看出TransferStack是后进来的线程放到了头部,会先进行匹配。
总结
一般的阻塞队列如果发现队列中没有满put方法就直接把数据放到队列中,然后线程继续执行,同样发现队列中有数据take方法也会直接从队列中拿到数据然后继续执行。
之前的队列都是把要交换的数据当作资源去put或take,而SynchronousQueue就好像是把线程当作资源,这样就不需要多出来资源来存储交换的数据了。
在之前的队列中没有数据是如果take方法进来就会阻塞,或者当队列满了的时候put方法进来也会阻塞。
而SynchronousQueue把put和take分成两类资源,阻塞队列中只有不同的类型就去匹配,否则就阻塞,同时SynchronousQueue通过实现先进先出表示公平,先进后出表示不公平。
Java程序员日常学习笔记,如理解有误欢迎各位交流讨论!
领取专属 10元无门槛券
私享最新 技术干货