我想知道如何向Kotlin.Flow
发送/发出项,所以我的用例是:
在使用者/视图模型/演示器中,我可以订阅collect
函数:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
但是问题在Repository
方面,对于RxJava,我们可以使用行为主体将其公开为Observable/Flowable
,并发出如下所示的新项:
behaviourSubject.onNext(true)
但是每当我建立一个新的流程:
flow {
}
我只能收集。如何向流发送值?
发布于 2019-08-04 01:12:57
如果要获取订阅/集合的最新值,则应使用ConflatedBroadcastChannel
private val channel = ConflatedBroadcastChannel<Boolean>()
这将复制BehaviourSubject
,将通道公开为流:
// Repository
fun observe() {
return channel.asFlow()
}
现在,要向公开的Flow
发送一个事件/值,只需向这个通道发送。
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
控制台:
错误
如果您只希望在之后接收值,那么您应该使用BroadcastChannel
代替。
为了表明这一点:
表现为Rx的PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
channel.send(false)
}
错误
只有false
被打印出来,因为第一个事件是在 collect { }
之前发送的。
表现为Rx的BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
confChannel.send(false)
}
真的 错误
这两个事件都是打印出来的,您总是得到最新的值(如果有)。
此外,还想提到Kotlin在DataFlow
上的团队开发(名称待定):
这似乎更适合这个用例(因为它将是一个冷流)。
发布于 2020-06-21 18:22:19
请看一看MutableStateFlow文档,因为它很快就将取代即将被废弃的ConflatedBroadcastChannel
。
要获得更好的上下文,请查看论柯特林在吉特卜上的存储库的原始问题。
发布于 2020-05-13 13:18:06
更新
Kotlin 1.4.0
现在可以与MutableSharedFlow
一起使用,它取代了对Channel
的需求。MutableSharedFlow
清理也是内置的,所以您不需要手动打开和关闭它,不像Channel
。如果需要类似主题的api for MutableSharedFlow
,请使用Flow
。
原始答案
由于您的问题有android
标记,我将添加一个Android实现,它允许您轻松地创建一个BehaviorSubject
或一个处理自己生命周期的PublishSubject
。
这在Android中是相关的,因为您不想忘记关闭通道并泄漏内存。此实现避免了通过将反应性流与创建和销毁片段/活动绑定而显式“处置”该流的需要。类似于LiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
通过使用Android的Lifecycle
库,我现在可以创建一个BehaviorSubject
,在活动/片段被销毁后清理它自己。
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
或者我可以使用缓冲的PublishSubject
创建BroadcastChannel
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
现在我可以做这样的事
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}
https://stackoverflow.com/questions/57345311
复制