如果使用一些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么我们可以使用 Sequence 来表示数字:
fun simple(): Sequence<Int> = sequence { // 序列构建器
for (i in 1..3) {
Thread.sleep(100) // 假装我们正在计算
yield(i) // 产生下一个值
}
}
fun main() {
simple().forEach { value -> println(value) }
}
上线的代码会阻塞主线程。 当这些值由异步代码计算时,我们可以使用 suspend 修饰符标记函数 simple, 这样它就可以在不阻塞的情况下执行其工作并将结果作为列表返回:
suspend fun simple(): List<Int> {
delay(1000) // 假装我们在这里做了一些异步的事情
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
使用 List 结果类型,意味着我们只能一次返回所有值。 为了表示异步计算的值流(stream),我们可以使用 Flow 类型
fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
}
注意使用 Flow 的代码与先前示例的下述区别:
flow { ... }
构建块中的代码可以挂起。simple
不再标有 suspend
修饰符。流采用与协程同样的协作取消。像往常一样,流的收集可以在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消。 以下示例展示了当 withTimeoutOrNull 块中代码在运行的时候流是如何在超时的情况下取消并停止执行其代码的:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> println(value) }
}
println("Done")
}
输出
Emitting 1
1
Emitting 2
2
Done
先前示例中的 flow { ... }
构建器是最基础的一个。还有其他构建器使流的声明更简单:
.asFlow()
扩展函数,可以将各种集合与序列转换为流。因此,从流中打印从 1 到 3 的数字的示例可以写成:
(1..3).asFlow().collect { value -> println(value) }
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步工作
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 一个请求流
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
(1..3).asFlow() // 一个请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 只获取前两个
.collect { value -> println(value) }
}
协程中的取消操作总是通过抛出异常.所以需要try catch
末端操作符是在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)//输出55
长时间运行的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下文中执行,并且更新 UI 的代码也许需要在 Dispatchers.Main 中执行。通常,withContext 用于在 Kotlin 协程中改变代码的上下文,但是 flow {...}
构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射 emit
如下段代码所示
fun simple(): Flow<Int> = flow {
// 在流构建器中更改消耗 CPU 代码的上下文的错误方式
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
emit(i) // 发射下一个值
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
因为 Flow was collected in XXX,but emission happened in XXX。具体怎么处理下面会说
该函数用于更改流发射的上下文
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
log("Emitting $i")
emit(i) // 发射下一个值
}
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
是不是和ObserveOn很像
考虑一种情况, 一个 simple 流的发射很慢,它每花费 100 毫秒才产生一个元素;而收集器也非常慢, 需要花费 300 毫秒来处理元素
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")//Collected in 1230 ms
}
我们可以在流上使用 buffer 操作符来并发运行这个 simple
流中发射元素的代码以及收集的代码, 而不是顺序运行它们:
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // 缓冲发射项,无需等待
.collect { value ->
delay(300) // 假装我们花费 300 毫秒来处理它
println(value)
}
}
println("Collected in $time ms")
}
它产生了相同的数字,只是更快了,由于我们高效地创建了处理流水线, 仅仅需要等待第一个数字产生的 100 毫秒以及处理每个数字各需花费的 300 毫秒。这种方式大约花费了 1000 毫秒来运行:
当发射器和收集器都很慢的时候,合并是加快处理速度的一种方式。它通过删除发射值来实现。 另一种方式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新发射最后一个值
println("Collecting $value")
delay(300) // 假装我们花费 300 毫秒来处理它
println("Done $value")
}
}
println("Collected in $time ms")
}
delay(300)把前面两个cancel了,所以没有输出Done。如果改成delay(50)就会全部输出
val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
.collect { println(it) } // 收集并打印
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
zip:合并多个数据,但是如果一方数据更多,多出来的那部分不会被发送。 combine:合并多个数据。将两方目前最新发射的数据组合在一起
(flatMapConcat)
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
(flatMapMerge 并发性质)
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
输出
1: First at 152 ms from start
2: First at 246 ms from start
3: First at 347 ms from start
1: Second at 652 ms from start
2: Second at 746 ms from start
3: Second at 848 ms from start
(flatMapLatest 在发出新流后立即取消先前流的收集)
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
输出
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
输出
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。 你可能已经注意到,它可以通过两种方式完成:命令式(finally)或声明式(onCompletion)
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
onCompletion 的主要优点是其 lambda 表达式的可空参数 Throwable
可以用于确定流收集是正常完成还是有异常发生
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
onCompletion 操作符与 catch 不同
onCompletion
操作符,并可以由 catch
操作符处理。null
异常。