@@ -27,11 +27,12 @@ import io.reactivex.Observable
2727import javax.inject.Inject
2828import kotlin.properties.ReadWriteProperty
2929import kotlin.reflect.KProperty
30+ import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
3031import kotlinx.coroutines.channels.BufferOverflow
31- import kotlinx.coroutines.flow.Flow
32+ import kotlinx.coroutines.flow.FlowCollector
3233import kotlinx.coroutines.flow.MutableSharedFlow
3334import kotlinx.coroutines.flow.MutableStateFlow
34- import kotlinx.coroutines.flow.filterNotNull
35+ import kotlinx.coroutines.flow.SharedFlow
3536import kotlinx.coroutines.rx2.asObservable
3637
3738/* *
@@ -48,8 +49,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
4849 private val useStateFlow
4950 get() = RibEvents .useStateFlowInteractorEvent
5051
51- @VisibleForTesting
52- internal val _lifecycleFlow : MutableSharedFlow <InteractorEvent ?> =
52+ private val _lifecycleFlow : MutableSharedFlow <InteractorEvent ?> =
5353 if (useStateFlow) {
5454 MutableStateFlow (null )
5555 } else {
@@ -59,8 +59,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
5959 BufferOverflow .DROP_OLDEST ,
6060 )
6161 }
62- public open val lifecycleFlow: Flow <InteractorEvent >
63- get() = _lifecycleFlow .filterNotNull()
62+ public open val lifecycleFlow: SharedFlow <InteractorEvent > = NonNullSharedFlow (_lifecycleFlow )
6463
6564 @Volatile private var _lifecycleObservable : Observable <InteractorEvent >? = null
6665
@@ -86,16 +85,16 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
8685 final override fun correspondingEvents (): CorrespondingEventsFunction <InteractorEvent > =
8786 LIFECYCLE_MAP_FUNCTION
8887
89- final override fun peekLifecycle (): InteractorEvent ? = _lifecycleFlow .replayCache.lastOrNull()
88+ final override fun peekLifecycle (): InteractorEvent ? = lifecycleFlow .replayCache.lastOrNull()
9089
9190 @OptIn(CoreFriendModuleApi ::class )
9291 final override fun requestScope (): CompletableSource =
93- _lifecycleFlow .asScopeCompletable(lifecycleRange)
92+ lifecycleFlow .asScopeCompletable(lifecycleRange)
9493
9594 // ---- InteractorType overrides ---- //
9695
9796 override fun isAttached (): Boolean =
98- _lifecycleFlow .replayCache.lastOrNull() == InteractorEvent .ACTIVE
97+ lifecycleFlow .replayCache.lastOrNull() == InteractorEvent .ACTIVE
9998
10099 override fun handleBackPress (): Boolean = false
101100
@@ -233,3 +232,20 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
233232 }
234233 }
235234}
235+
236+ // See https://github.com/Kotlin/kotlinx.coroutines/issues/2514
237+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
238+ private class NonNullSharedFlow <R : Any >(
239+ private val upstream : SharedFlow <R ?>,
240+ ) : SharedFlow<R> {
241+ override val replayCache: List <R >
242+ get() = upstream.replayCache.filterNotNull()
243+
244+ override suspend fun collect (collector : FlowCollector <R >): Nothing {
245+ upstream.collect { value ->
246+ if (value != null ) {
247+ collector.emit(value)
248+ }
249+ }
250+ }
251+ }
0 commit comments