Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kotlin协程-特殊的阻塞协程

Kotlin协程-特殊的阻塞协程

作者头像
PhoenixZheng
发布于 2021-05-17 04:22:39
发布于 2021-05-17 04:22:39
2.5K00
代码可运行
举报
运行总次数:0
代码可运行

阻塞协程是种特殊的协程启动方式,一般是用 runBlocking{} 扩起来一段协程。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
fun main() = runBlocking {
    launch {
        println("launch start")
        delay(100L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
        println("World Thread: ${Thread.currentThread().name}")
        println("World!") // 在延迟后打印输出
    }
    println("Hello!")
    println("Hello Thread: ${Thread.currentThread().name}")
    Thread.sleep(400L) // 阻塞主线程 2 秒钟来保证 JVM 存活
    println("out launch done")
}

这段代码的执行结果是

Hello! Hello Thread: main out launch done launch start World Thread: main World!

代码包含了runBlocking{}和launch{}两段coroutine,父子关系。首先是父协程得到执行,然后才是子协程。

重点是这两段协程都在同一个线程main里完成。这里就带来一个有趣的问题, runBLocking{}和平时常用的launch有什么区别?

甚至你可以把上面的launch{},改成 GlobalScope.launch{},看看结果有什么不一样。这里给出结果,改用GlobalScope.launch之后,子协程会在一个独立的线程里运行。

runBlocking

kotlin协程官网上对于这个api的解释是桥接阻塞与非阻塞的世界。这个机翻中文让我迷惑了很久,一直不能明白它的意思。于是就去翻了源码的注释,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
 * This function should not be used from a coroutine. It is designed to bridge regular blocking code
 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
 ...
 */
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {

blablabla一堆,意思是也跟"桥接阻塞与非阻塞的世界"差不多,只是多了一句“会阻塞当前线程直到coroutine完成”。但实际情况跟注释有点不同,如果在 runBlocking 中开一个 GlobalScope.launch,并且在里面延时很久,那么外面的线程其实是不会等待 GlobalScope 里的协程完成的。弄明白这点需要理解这个特殊的阻塞协程 runBlocking 的原理。

创建

runBlocking的创建在jvm包下的Builders.kt中,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop //默认派发器
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } //继承自上下文的派发器
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking() 
}

首先它会判断当前是否上下文有现成的Dispatcher,或者叫Intercepter,如果有的话就直接拿过来。没有的话就使用默认的eventloop。EventLoop是协程里对阻塞型coroutine进行调度的默认调度器。runBlocking和launch的主要区别就靠EventLoop实现。

在创建完coroutine后就进入派发流程了,这部分和Kotlin协程-一个协程的生命周期中的逻辑比较相似,下面也会讲到。

最后会调用 joinBlocking() 去执行coroutine,我们放到第三部分执行分析。

派发

EventLoop是一个特殊的调度类型。它的公用实现在 EventLoop.common.kt 中,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@ThreadLocal
internal object ThreadLocalEventLoop {
    private val ref = CommonThreadLocal<EventLoop?>()

    internal val eventLoop: EventLoop //eventloop对象
        get() = ref.get() ?: createEventLoop().also { ref.set(it) }

    internal fun currentOrNull(): EventLoop? =
        ref.get()

    internal fun resetEventLoop() {
        ref.set(null)
    }

    internal fun setEventLoop(eventLoop: EventLoop) {
        ref.set(eventLoop)
    }
}

createEventLoop()是个expect函数,用来获取平台上的实际实现。函数声明也在这个文件中,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
internal expect fun createEventLoop(): EventLoop

而eventloop对象,是保存在ThreadLocal中的,意味着这个对象在每个线程里都会有一个,而且互不影响。每个线程都可以起一个独立的阻塞协程队列。

在jvm平台上的eventloop对象是在jvm包下的EventLoop.kt中,它的默认实现是 BlockingEventLoop

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
internal class BlockingEventLoop(
    override val thread: Thread
) : EventLoopImplBase()

internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())

按惯例最后会去执行派发器的dispatch()方法,因为有了之前的分析经验,这里直接到它的dispatch()函数,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) //重载 dispatch函数,调用入队函数

public fun enqueue(task: Runnable) {
    if (enqueueImpl(task)) { //入队
        // todo: we should unpark only when this delayed task became first in the queue
        unpark()
    } else {
        DefaultExecutor.enqueue(task)
    }
}

@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean { //真正入队
    _queue.loop { queue ->
        if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
        when (queue) {
            null -> if (_queue.compareAndSet(null, task)) return true //在这里入队
            is Queue<*> -> {
                when ((queue as Queue<Runnable>).addLast(task)) {
                    Queue.ADD_SUCCESS -> return true
                    Queue.ADD_CLOSED -> return false
                    Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
                }
            }
            else -> when {
                queue === CLOSED_EMPTY -> return false
                else -> {
                    // update to full-blown queue to add one more
                    val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
                    newQueue.addLast(queue as Runnable)
                    newQueue.addLast(task)
                    if (_queue.compareAndSet(queue, newQueue)) return true
                }
            }
        }
    }
}

BlockingEventLoop 的入队函数 enqueueImpl 逻辑比较简单,通过when判断queue的类型走不同的逻辑。实际上这段逻辑还不稳定,仔细分析会发现,queue 在blocking eventloop 的场景下,只会有 null一种可能。所以它的入队,实际上最后都会走这段代码。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
null -> if (_queue.compareAndSet(null, task)) return true

执行

回到上面的创建阶段,最后会执行 joinBlocking

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
   fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            eventLoop?.incrementUseCount()
            try {
                while (true) {
                    @Suppress("DEPRECATION")
                    if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE //执行队列里的下一个任务
                    // note: process next even may loose unpark flag, so check if completed before parking
                    if (isCompleted) break
                    parkNanos(this, parkNanos)
                }
            } finally { // paranoia
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        // now return result
        val state = this.state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }

processNextEvent()会从上面的queue中取出任务并且执行。因为eventloop在jvm上的实现是BlockingEventLoop,它的父类是 EventLoopImplBase,在 EventLoop.common.kt 中,

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
override fun processNextEvent(): Long {
    // unconfined events take priority
    if (processUnconfinedEvent()) return nextTime
    // queue all delayed tasks that are due to be executed
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) { //判断是否到延时时间,否则重新入队
        val now = nanoTime()
        while (true) {
            // make sure that moving from delayed to queue removes from delayed only after it is added to queue
            // to make sure that 'isEmpty' and `nextTime` that check both of them
            // do not transiently report that both delayed and queue are empty during move
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {//重新入队
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // then process one event from queue
    dequeue()?.run() //出队并执行
    return nextTime
}

dequeue()的实现也相对简单,跟入队的逻辑差不多

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
    _queue.loop { queue ->
        when (queue) {
            null -> return null
            is Queue<*> -> {
                val result = (queue as Queue<Runnable>).removeFirstOrNull()
                if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
                _queue.compareAndSet(queue, queue.next())
            }
            else -> when {
                queue === CLOSED_EMPTY -> return null
                else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable //出队并把当前queue设为null
            }
        }
    }
}

上面说过,在BlockingEventLoop场景下,queue的入队只会有null一种可能。而这里也是一样,只会从else进去。

虽然queue名义上是个队列,它也支持队列的逻辑,比如在 is Queue<*> 这个分支上,它的实现是个队列。但现在可以把它当做个容量为1的队列。

之后就是task.run的流程了,和之前的分析没什么区别。

BlockingEventLoop的特殊性

上面的分析可以看出一个问题,queue不是个队列,而且每次它都只会在 null->task 之间转换。也就是说,不管什么时候,queue的长度只会是1或者0.

这个问说明,runBLocking{}这种协程,它的运行逻辑是先把父协程放队列里,然后取出来执行,执行完毕再把子协程入队,再出队子协程,用同样的方式递归。虽然这种方式能保证整体是个阻塞流程,但是设计上不够优雅。猜测是为了避免协程嵌套太多,导致stack over flow的问题出现。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Android每日一讲 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
深入浅出Kotlin协程
协程(Coroutines)已经随着Kotlin1.3版本一起发布了1.0正式版,android平台可以使用如下方式引入:
wiizhang
2018/09/11
11.7K3
深入浅出Kotlin协程
Kotlin协程-一个协程的生命周期
在安卓或者kotlin平台上使用协程是很简单的一件事情。举一个最简单的例子,不依赖安卓平台的协程代码,
PhoenixZheng
2021/04/26
1K0
Kotlin协程-一个协程的生命周期
揭秘kotlin协程中的CoroutineContext
从kotlin1.1开始,协程就被添加到kotlin中作为实验性功能,直到kotlin1.3,协程在kotlin中的api已经基本稳定下来了,现在kotlin已经发布到了1.4,为协程添加了更多的功能并进一步完善了它,所以我们现在在kotlin代码中可以放心的引入kotlin协程并使用它,其实协程并不是kotlin独有的功能,它是一个广泛的概念,协作式多任务的实现,除了kotlin外,很多语言如Go、Python等都通过自己的方式实现了协程,本文阅读前希望你已经知道如何使用kotlin协程,如果不熟悉可以阅读一下官方文档:
做个快乐的码农
2022/02/07
2K0
揭秘kotlin协程中的CoroutineContext
kotlin-协程的异常处理机制分析
使用kotlin的协程一段时间了,常用的用法也已经很熟悉,但都是停留在使用的阶段,没有对代码深入了解过,还是感觉有点虚;趁着过年这段时间,针对协程的异常处理,对其相关的源码学习了一波,梳理总结一下自己的理解。
37手游安卓团队
2021/02/22
9780
协程到底是怎么切换线程的?
可以看出CoroutineScope的代码很简单,主要作用是提供CoroutineContext,协程运行的上下文 我们常见的实现有GlobalScope,LifecycleScope,ViewModelScope等
Rouse
2021/07/30
9050
协程到底是怎么切换线程的?
Kotlin协程解析系列(上):协程调度与挂起
本文是Kotlin协程解析系列文章的开篇,主要介绍Kotlin协程的创建、协程调度与协程挂起相关的内容
2020labs小助手
2022/06/07
2.2K0
Kotlin | 协程使用手册(不间断更新)
在概念上,async 就类似于 launch。它启动了一个单独的协程,这是一个轻量级的线程并与其它所有的协程一起并发的工作。不同之处在于 launch 返回一个 Job 并且不附带任何结果值,而 async 返回一个 Deferred —— 一个轻量级的非阻塞 future, 这代表了一个将会在稍后提供结果的 promise。你可以使用 .await() 在一个延期的值上得到它的最终结果, 但是 Deferred 也是一个 Job,所以如果需要的话,你可以取消它。
Petterp
2022/02/09
2.5K0
Kotlin | 协程使用手册(不间断更新)
kotlin--协程的启动和取消
launch:我们之前已经使用过了GlobalScope的launch来启动协程,它返回一个Job async:返回一个Deferred,它也是一个Job,但是可以使用await函数获得运行的结果 除了之前结构化并发中介绍的几种指定CoroutineScope的API外,我们还可以使用runBlocking函数来指定CoroutineScope,他会使用主线程来转换成协程 launch和async内如果有子协程,那么该协程会等待子协程执行结束
aruba
2021/12/06
1.1K0
kotlin--协程的启动和取消
Kotlin协程-协程的内部概念Continuation
+-------+           +-----------+ | START |----------------------->| SUSPENDED | +-------+           +-----------+                  |  ^                  V  |                +------------+ completion invoked +-----------+                | RUNNING |------------------->| COMPLETED |                +------------+          +-----------+
PhoenixZheng
2021/05/17
1.8K0
Kotlin协程-协程的内部概念Continuation
抽丝剥茧Kotlin - 协程
文章接上篇,这一篇我们好好聊一聊协程的原理,通过上一篇的学习,相信大家对于如何使用协程已经非常熟悉了。
用户1907613
2020/08/28
8550
Kotlin中的协程及在Android中的应用
Kotlin的一个协程可以理解为是运行在线程上的一个执行任务并且该任务可以在不同的线程间切换,一个线程可以同时运行多个协程。
码客说
2024/03/29
3910
《Kotin 极简教程》第9章 轻量级线程:协程(1)
在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-class或者library的形式提供对基于协程的并发模型的支持。其中比较典型的有Scheme、Lua、Python、Perl、Go等以first-class的方式提供对协程的支持。
一个会写诗的程序员
2018/08/17
1.2K0
Kotlin 协程之Practice
Kotlin 练习参考https://www.kotlincn.net/docs/reference/
Yif
2019/12/26
1.2K0
Kotlin协程实现原理:ContinuationInterceptor&CoroutineDispatcher
新的一周开始了,大家继续干就完事了,同时感谢老铁们的支持! 今天我们继续来聊聊Kotlin的协程Coroutine。 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotli
Rouse
2020/11/17
1.8K0
Kotlin协程实现原理:ContinuationInterceptor&CoroutineDispatcher
Kotlin 协程的上下文和调度器介绍-Dispatchers
协程的上下文通常是CoroutineContext类型为代表。这个类型是被定义在Kotlin的标准库中。
zinyan.com
2023/07/13
5080
Kotlin 协程的上下文和调度器介绍-Dispatchers
kotlin--协程上下文、异常处理
当我们在a协程延迟函数100ms之前开启一个子协程b,b做了200ms的事情,如果不考虑调度消耗的时间,那么a协程的生命也会延长成200ms
aruba
2021/12/06
9770
kotlin--协程上下文、异常处理
kotlin 协程入门教程
链接:https://juejin.cn/post/7370994785655767067
Rouse
2024/05/28
2470
kotlin 协程入门教程
Kotlin---协程的使用
在使用协程之前,需要保证Kotlin-Gradle-Plugin的版本高于1.3。目前最高的版本为1.3.11。否则编译会报错
None_Ling
2019/02/25
1.4K0
【Kotlin 协程】协程启动 ① ( 协程构建器 | launch 构建器 | async 构建器 | runBlocking 函数 | Deferred 类 )
协程 需要 协程构建器 来启动 , 协程构建器 就是 CoroutineScope 协程作用域的两个扩展函数 ;
韩曙亮
2023/03/30
5110
【Kotlin 协程】协程启动 ① ( 协程构建器 | launch 构建器 | async 构建器 | runBlocking 函数 | Deferred 类 )
【Kotlin 协程】Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )
通过 CoroutineScope#produce 函数 , 可以快速构造一个 生产者协程 , 其返回值是 ReceiveChannel 实例对象 , 这样就可以在消费者协程中通过该 ReceiveChannel 实例获取并消费数据 ;
韩曙亮
2023/03/30
5370
【Kotlin 协程】Channel 通道 ③ ( CoroutineScope#produce 构造生产者协程 | CoroutineScope#actor 构造消费者协程 )
推荐阅读
相关推荐
深入浅出Kotlin协程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验