@@ -27,11 +27,12 @@ import io.reactivex.Observable
2727import  javax.inject.Inject 
2828import  kotlin.properties.ReadWriteProperty 
2929import  kotlin.reflect.KProperty 
30+ import  kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi 
3031import  kotlinx.coroutines.channels.BufferOverflow 
31- import  kotlinx.coroutines.flow.Flow  
32+ import  kotlinx.coroutines.flow.FlowCollector  
3233import  kotlinx.coroutines.flow.MutableSharedFlow 
3334import  kotlinx.coroutines.flow.MutableStateFlow 
34- import  kotlinx.coroutines.flow.filterNotNull  
35+ import  kotlinx.coroutines.flow.SharedFlow  
3536import  kotlinx.coroutines.rx2.asObservable 
3637
3738/* *
@@ -48,8 +49,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
4849  private  val  useStateFlow
4950    get() =  RibEvents .useStateFlowInteractorEvent
5051
51-   @VisibleForTesting
52-   internal  val  _lifecycleFlow :  MutableSharedFlow <InteractorEvent ?> = 
52+   private  val  _lifecycleFlow :  MutableSharedFlow <InteractorEvent ?> = 
5353    if  (useStateFlow) {
5454      MutableStateFlow (null )
5555    } else  {
@@ -59,8 +59,7 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
5959        BufferOverflow .DROP_OLDEST ,
6060      )
6161    }
62-   public  open  val  lifecycleFlow:  Flow <InteractorEvent >
63-     get() =  _lifecycleFlow .filterNotNull()
62+   public  open  val  lifecycleFlow:  SharedFlow <InteractorEvent > =  NonNullSharedFlow (_lifecycleFlow )
6463
6564  @Volatile private  var  _lifecycleObservable :  Observable <InteractorEvent >?  =  null 
6665
@@ -86,16 +85,16 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
8685  final  override  fun  correspondingEvents (): CorrespondingEventsFunction <InteractorEvent > = 
8786    LIFECYCLE_MAP_FUNCTION 
8887
89-   final  override  fun  peekLifecycle (): InteractorEvent ?  =  _lifecycleFlow .replayCache.lastOrNull()
88+   final  override  fun  peekLifecycle (): InteractorEvent ?  =  lifecycleFlow .replayCache.lastOrNull()
9089
9190  @OptIn(CoreFriendModuleApi ::class )
9291  final  override  fun  requestScope (): CompletableSource  = 
93-     _lifecycleFlow .asScopeCompletable(lifecycleRange)
92+     lifecycleFlow .asScopeCompletable(lifecycleRange)
9493
9594  //  ---- InteractorType overrides ---- //
9695
9796  override  fun  isAttached (): Boolean  = 
98-     _lifecycleFlow .replayCache.lastOrNull() ==  InteractorEvent .ACTIVE 
97+     lifecycleFlow .replayCache.lastOrNull() ==  InteractorEvent .ACTIVE 
9998
10099  override  fun  handleBackPress (): Boolean  =  false 
101100
@@ -233,3 +232,20 @@ public abstract class Interactor<P : Any, R : Router<*>>() : InteractorType, Rib
233232      }
234233  }
235234}
235+ 
236+ //  See https://github.com/Kotlin/kotlinx.coroutines/issues/2514
237+ @OptIn(ExperimentalForInheritanceCoroutinesApi ::class )
238+ private  class  NonNullSharedFlow <R  :  Any >(
239+   private  val  upstream :  SharedFlow <R ?>,
240+ ) : SharedFlow<R> {
241+   override  val  replayCache:  List <R >
242+     get() =  upstream.replayCache.filterNotNull()
243+ 
244+   override  suspend  fun  collect (collector :  FlowCollector <R >): Nothing  {
245+     upstream.collect { value -> 
246+       if  (value !=  null ) {
247+         collector.emit(value)
248+       }
249+     }
250+   }
251+ }
0 commit comments