1+ /*
2+ * Copyright 2019-2020 David Karnok
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package hu.akarnokd.kotlin.flow
18+
19+ import kotlinx.coroutines.FlowPreview
20+ import kotlinx.coroutines.coroutineScope
21+ import kotlinx.coroutines.currentCoroutineContext
22+ import kotlinx.coroutines.flow.AbstractFlow
23+ import kotlinx.coroutines.flow.Flow
24+ import kotlinx.coroutines.flow.FlowCollector
25+ import kotlinx.coroutines.flow.launchIn
26+ import kotlinx.coroutines.launch
27+ import java.util.concurrent.CancellationException
28+ import java.util.concurrent.ConcurrentLinkedQueue
29+ import java.util.concurrent.atomic.AtomicInteger
30+ import java.util.concurrent.atomic.AtomicReference
31+
32+ /* *
33+ * A subject implementation that dispatches signals to multiple
34+ * consumers or buffers them until such consumers arrive.
35+ *
36+ * @param <T> the element type of the [Flow]
37+ * @param bufferSize the number of items to buffer until consumers arrive
38+ */
39+ @FlowPreview
40+ class MulticastSubject <T >(private val bufferSize : Int = 32 ) : AbstractFlow<T>(), SubjectAPI<T> {
41+
42+ val queue = ConcurrentLinkedQueue <T >()
43+
44+ val availableQueue = AtomicInteger (bufferSize)
45+
46+ val consumers = AtomicReference (EMPTY as Array <ResumableCollector <T >>)
47+
48+ val producerAwait = Resumable ()
49+
50+ val wip = AtomicInteger ()
51+
52+ @Volatile
53+ var error: Throwable ? = null
54+
55+ override suspend fun emit (value : T ) {
56+ while (availableQueue.get() == 0 ) {
57+ producerAwait.await()
58+ }
59+ queue.offer(value)
60+ availableQueue.decrementAndGet();
61+ drain()
62+ }
63+
64+ override suspend fun emitError (ex : Throwable ) {
65+ error = ex
66+ drain()
67+ }
68+
69+ override suspend fun complete () {
70+ error = DONE
71+ drain()
72+ }
73+
74+ override fun hasCollectors (): Boolean {
75+ return consumers.get().isNotEmpty();
76+ }
77+
78+ override fun collectorCount (): Int {
79+ return consumers.get().size;
80+ }
81+
82+ override suspend fun collectSafely (collector : FlowCollector <T >) {
83+ val c = ResumableCollector <T >()
84+ if (add(c)) {
85+ c.readyConsumer()
86+ drain()
87+ c.drain(collector) { remove(it) }
88+ } else {
89+ val ex = error
90+ if (ex != null && ex != DONE ) {
91+ throw ex
92+ }
93+ }
94+ }
95+
96+ private fun add (collector : ResumableCollector <T >) : Boolean {
97+ while (true ) {
98+ val a = consumers.get()
99+ if (a == TERMINATED ) {
100+ return false
101+ }
102+ val b = Array <ResumableCollector <T >>(a.size + 1 ) { idx ->
103+ if (idx < a.size) a[idx] else collector
104+ }
105+ if (consumers.compareAndSet(a, b)) {
106+ return true
107+ }
108+ }
109+ }
110+ private fun remove (collector : ResumableCollector <T >) {
111+ while (true ) {
112+ val a = consumers.get()
113+ val n = a.size
114+ if (n == 0 ) {
115+ return
116+ }
117+ var j = - 1
118+ for (i in 0 until n) {
119+ if (a[i] == collector) {
120+ j = i
121+ break
122+ }
123+ }
124+ if (j < 0 ) {
125+ return ;
126+ }
127+ var b = EMPTY as Array <ResumableCollector <T >? >
128+ if (n != 1 ) {
129+ b = Array <ResumableCollector <T >? > (n - 1 ) { null }
130+ System .arraycopy(a, 0 , b, 0 , j)
131+ System .arraycopy(a, j + 1 , b, j, n - j - 1 )
132+ }
133+ if (consumers.compareAndSet(a, b as Array <ResumableCollector <T >>)) {
134+ return ;
135+ }
136+ }
137+ }
138+
139+ private suspend fun drain () {
140+ if (wip.getAndIncrement() != 0 ) {
141+ return ;
142+ }
143+
144+ while (true ) {
145+ val collectors = consumers.get()
146+ if (collectors.isNotEmpty()) {
147+ val ex = error;
148+ val v = queue.poll();
149+
150+ if (v == null && ex != null ) {
151+ finish(ex)
152+ }
153+ else if (v != null ) {
154+ var k = 0 ;
155+ for (collector in consumers.get()) {
156+ try {
157+ // println("MulticastSubject -> [$k]: $v")
158+ collector.next(v)
159+ } catch (ex: CancellationException ) {
160+ remove(collector);
161+ }
162+ k++
163+ }
164+ availableQueue.getAndIncrement()
165+ producerAwait.resume()
166+ continue
167+ }
168+ } else {
169+ val ex = error;
170+ if (ex != null && queue.isEmpty()) {
171+ finish(ex)
172+ }
173+ }
174+ if (wip.decrementAndGet() == 0 ) {
175+ break
176+ }
177+ }
178+ }
179+
180+ private suspend fun finish (ex : Throwable ) {
181+ if (ex == DONE ) {
182+ for (collector in consumers.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
183+ try {
184+ collector.complete()
185+ } catch (_: CancellationException ) {
186+ // ignored
187+ }
188+ }
189+ } else {
190+ for (collector in consumers.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
191+ try {
192+ collector.error(ex)
193+ } catch (_: CancellationException ) {
194+ // ignored
195+ }
196+ }
197+ }
198+ }
199+
200+ companion object {
201+ val DONE : Throwable = Throwable (" Subject Completed" )
202+
203+ val EMPTY = arrayOf<ResumableCollector <Any >>();
204+
205+ val TERMINATED = arrayOf<ResumableCollector <Any >>();
206+ }
207+ }
0 commit comments