最近了解了一下 Koltin Flow 相关的一些内容。在这里做一些简单的总结。关于 Flow 的知识点有如下一些:
Kotlin 里 Flow 的基本用法是使用一个 flow
方法创建 Flow 对象:
flow {}
需要更新值的时候,在代码块内使用 emit
方法发射值。需要监听值的时候,使用 collect
方法。
flow
方法会创建一个 SafeFlow
对象。
SafeFlow
的继承关系如上图。我们看下 SafeFlow
是如何收集值的
// SafeFlow collect
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
这里创建一个 SafeCollector,并执行 collectSafely
:
// SafeFlow collectSafely
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
这里很简单,就是执行了一下 SafeCollector 的 block,也就是我们使用 flow
时候的代码块,它其实是 FlowCollector
的一个扩展方法。所以这里调用 emit
的时候调用的也就是 SafeCollector
的 emit
。所以 flow
的代码块里的逻辑是需要 collect
之后才会去执行的,我们管这种叫做 冷流
。
再看下这里是怎么发射值的:
// SafeCollector emit
fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// checkContext
private fun checkContext(currentContext: CoroutineContext,previousContext: CoroutineContext?,value: T) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
这里会检查一下协程上下文,然后执行 emitFun
上下文检查会检查如下内容,这部分在相关源码的注释里面有写,读者感兴趣可以去翻阅一下。
DownstreamExceptionElement
。如果下游存在异常,就会包装成 DownstreamExceptionElement
,这样上游 emit 的时候就会知道,这时候就会抛出异常。我们看一段源码注释里的示例val flow = flow {
emit(1)
try {
emit(2)
} catch (e:Exception) {
emit(3)
}
}
flow.collect {
if (it == 2) {
throw CancellationException("cancel")
} else {
println("collect $it")
}
}
这段代码的输出为
collect 1
cancel
这里当collect到2的时候下游抛出了一个异常,从预期上来讲,数据流出现异常应该终止,如果没有上面的这个机制,实际上的输出会变成
collect 1
collect 3
如果不对上下文进行检查,那么产生的结果和我们的预期是不符的。
emit
不是线程安全的,所以我们不允许使用的时候出现下面这种情况:coroutineScope {
launch {
emit(1)
}
launch {
emit(2)
}
}
这种情况会在 checkContext
里面调用 transitiveCoroutineParent
进行判断:
// SafeCollector.common.kt
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
if (this === null) return null
if (this === collectJob) return this
if (this !is ScopeCoroutine<*>) return this
return parent.transitiveCoroutineParent(collectJob)
}
如果 emit 所在上下文和 collect 所在上下文不一致,就会报错。
在日常开发中,普通的 Flow API 只能有一个 collect 的地方,并不能满足我们的需求,所以 Kotlin 还提供了更丰富的 APIStateFlow
和 SharedFlow
。
顾名思义,StateFlow
就是维护状态的 Flow, 它的使用非常类似 LiveData
:
val state = MutableStateFlow<Int>(0)//必须要初始值
// 更新值
state.value = 100
// 监听
state.collect{} // in coroutine suspend method
MutableStatFlow
支持多个观察者对其进行 collect
,并且 MutableStateFlow
初始化的时候必须有值。这个和 Livedata
非常接近,也会存在 LiveData
的粘性事件的问题。
SharedFlow
的使用类似 StateFlow
:
val stream = MutableSharedFlow()
// 更新
scope.launch {
stream.emit(100)
}
//监听
scope.launch {
stream.collect {}
}
和 StateFlow
相比,SharedFlow
只有 emit
方法,并且构造方法里面有 3 个可选的参数。
replay 和 extraBufferCapacity 默认都为 0, replay 为重播给收集方的数量。例如数据流依次为 1, 2, 3, 4, 5,replay 是2,那么这时候收集方会收到 4, 5 的值。extraBufferCapacity 为额外的缓冲队列的容量。onBufferOverrflow 则为背压(collect比emit耗时,emit的数据源太多处理不过来)的情况下的处理策略,包括
从这里可以看出, SharedFlow
非常适合作为事件流的处理。当参数都为默认值的时候,重播数量为0,那么只会在 emit
最新值的时候才会收到通知。也不会存在粘性事件的问题。
初步认识上面的知识点后,我们可以利用 Flow 来改进之前的 Android 代码:
LiveData
或者直接使用 StateFlow
,需要注意的是,状态
是一直存在的 UI 状态,无论页面重建或者位置改变,状态都是存在的。这里的状态是允许重复响应的,例如一个 TextView 显示的值是 200,那么即使 页面发生了重建,这个值仍然显示的是 200。
SharedFlow
替代 LiveData
进行处理。场景的逻辑状态有例如 加载成功后弹一个 Toast,图片裁剪后触发上传操作等。这类逻辑如果使用 LiveData
或者 StateFlow
,当页面重建后,之前的值都会被监听到,反复弹 Toast,这是一件非常麻烦的事情, 如果使用 SharedFlow
,则不会遇到。