阻塞协程是种特殊的协程启动方式,一般是用 runBlocking{} 扩起来一段协程。
fun main() = runBlocking {
launch {
println("launch start")
delay(100L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World Thread: ${Thread.currentThread().name}")
println("World!") // 在延迟后打印输出
}
println("Hello!")
println("Hello Thread: ${Thread.currentThread().name}")
Thread.sleep(400L) // 阻塞主线程 2 秒钟来保证 JVM 存活
println("out launch done")
}
这段代码的执行结果是
Hello! Hello Thread: main out launch done launch start World Thread: main World!
代码包含了runBlocking{}和launch{}两段coroutine,父子关系。首先是父协程得到执行,然后才是子协程。
重点是这两段协程都在同一个线程main里完成。这里就带来一个有趣的问题, runBLocking{}和平时常用的launch有什么区别?
甚至你可以把上面的launch{},改成 GlobalScope.launch{},看看结果有什么不一样。这里给出结果,改用GlobalScope.launch之后,子协程会在一个独立的线程里运行。
在kotlin协程官网上对于这个api的解释是桥接阻塞与非阻塞的世界。这个机翻中文让我迷惑了很久,一直不能明白它的意思。于是就去翻了源码的注释,
/**
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
* This function should not be used from a coroutine. It is designed to bridge regular blocking code
* to libraries that are written in suspending style, to be used in `main` functions and in tests.
...
*/
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
blablabla一堆,意思是也跟"桥接阻塞与非阻塞的世界"差不多,只是多了一句“会阻塞当前线程直到coroutine完成”。但实际情况跟注释有点不同,如果在 runBlocking 中开一个 GlobalScope.launch,并且在里面延时很久,那么外面的线程其实是不会等待 GlobalScope 里的协程完成的。弄明白这点需要理解这个特殊的阻塞协程 runBlocking 的原理。
runBlocking的创建在jvm包下的Builders.kt中,
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop //默认派发器
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } //继承自上下文的派发器
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
首先它会判断当前是否上下文有现成的Dispatcher,或者叫Intercepter,如果有的话就直接拿过来。没有的话就使用默认的eventloop。EventLoop是协程里对阻塞型coroutine进行调度的默认调度器。runBlocking和launch的主要区别就靠EventLoop实现。
在创建完coroutine后就进入派发流程了,这部分和Kotlin协程-一个协程的生命周期中的逻辑比较相似,下面也会讲到。
最后会调用 joinBlocking() 去执行coroutine,我们放到第三部分执行
分析。
EventLoop是一个特殊的调度类型。它的公用实现在 EventLoop.common.kt 中,
@ThreadLocal
internal object ThreadLocalEventLoop {
private val ref = CommonThreadLocal<EventLoop?>()
internal val eventLoop: EventLoop //eventloop对象
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal fun currentOrNull(): EventLoop? =
ref.get()
internal fun resetEventLoop() {
ref.set(null)
}
internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}
createEventLoop()是个expect函数,用来获取平台上的实际实现。函数声明也在这个文件中,
internal expect fun createEventLoop(): EventLoop
而eventloop对象,是保存在ThreadLocal中的,意味着这个对象在每个线程里都会有一个,而且互不影响。每个线程都可以起一个独立的阻塞协程队列。
在jvm平台上的eventloop对象是在jvm包下的EventLoop.kt中,它的默认实现是 BlockingEventLoop
internal class BlockingEventLoop(
override val thread: Thread
) : EventLoopImplBase()
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
按惯例最后会去执行派发器的dispatch()方法,因为有了之前的分析经验,这里直接到它的dispatch()函数,
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) //重载 dispatch函数,调用入队函数
public fun enqueue(task: Runnable) {
if (enqueueImpl(task)) { //入队
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean { //真正入队
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true //在这里入队
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
BlockingEventLoop 的入队函数 enqueueImpl 逻辑比较简单,通过when判断queue的类型走不同的逻辑。实际上这段逻辑还不稳定,仔细分析会发现,queue 在blocking eventloop 的场景下,只会有 null一种可能。所以它的入队,实际上最后都会走这段代码。
null -> if (_queue.compareAndSet(null, task)) return true
回到上面的创建阶段,最后会执行 joinBlocking
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE //执行队列里的下一个任务
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
processNextEvent()会从上面的queue中取出任务并且执行。因为eventloop在jvm上的实现是BlockingEventLoop,它的父类是 EventLoopImplBase,在 EventLoop.common.kt 中,
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return nextTime
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) { //判断是否到延时时间,否则重新入队
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {//重新入队
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
dequeue()?.run() //出队并执行
return nextTime
}
dequeue()的实现也相对简单,跟入队的逻辑差不多
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable //出队并把当前queue设为null
}
}
}
}
上面说过,在BlockingEventLoop场景下,queue的入队只会有null一种可能。而这里也是一样,只会从else进去。
虽然queue名义上是个队列,它也支持队列的逻辑,比如在 is Queue<*>
这个分支上,它的实现是个队列。但现在可以把它当做个容量为1的队列。
之后就是task.run的流程了,和之前的分析没什么区别。
上面的分析可以看出一个问题,queue不是个队列,而且每次它都只会在 null->task 之间转换。也就是说,不管什么时候,queue的长度只会是1或者0.
这个问说明,runBLocking{}这种协程,它的运行逻辑是先把父协程放队列里,然后取出来执行,执行完毕再把子协程入队,再出队子协程,用同样的方式递归。虽然这种方式能保证整体是个阻塞流程,但是设计上不够优雅。猜测是为了避免协程嵌套太多,导致stack over flow的问题出现。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有