Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
}

@override
void onError(Object e, StackTrace st) => sink.addError(e, st);
void onError(Object e, StackTrace st) => sink.addErrorSync(e, st);

@override
void onDone() {
Expand All @@ -93,7 +93,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
queue.clear();

_windowSubscription?.cancel();
sink.close();
sink.closeSync();
}

@override
Expand All @@ -108,7 +108,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
@override
void onResume() => _windowSubscription?.resume();

void maybeCreateWindow(S event, EventSink<T> sink) {
void maybeCreateWindow(S event, EnhancedEventSink<T> sink) {
switch (_strategy) {
// for example throttle
case WindowStrategy.eventAfterLastWindow:
Expand Down Expand Up @@ -142,29 +142,30 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
}
}

void maybeCloseWindow(EventSink<T> sink) {
void maybeCloseWindow(EnhancedEventSink<T> sink) {
if (_closeWindowWhen != null && _closeWindowWhen!(unmodifiableQueue)) {
resolveWindowEnd(sink);
}
}

StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
StreamSubscription<dynamic> singleWindow(
S event, EnhancedEventSink<T> sink) =>
buildStream(event, sink).take(1).listen(
null,
onError: sink.addError,
onError: sink.addErrorSync,
onDone: () => resolveWindowEnd(sink, _mainClosed),
);

// opens a new Window which is kept open until the main Stream
// closes.
StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
StreamSubscription<dynamic> multiWindow(S event, EnhancedEventSink<T> sink) =>
buildStream(event, sink).listen(
(dynamic _) => resolveWindowEnd(sink),
onError: sink.addError,
onError: sink.addErrorSync,
onDone: () => resolveWindowEnd(sink),
);

Stream<void> buildStream(S event, EventSink<T> sink) {
Stream<void> buildStream(S event, EnhancedEventSink<T> sink) {
Stream<void> stream;

_windowSubscription?.cancel();
Expand All @@ -174,27 +175,28 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
return stream;
}

void resolveWindowStart(S event, EventSink<T> sink) {
void resolveWindowStart(S event, EnhancedEventSink<T> sink) {
if (_onWindowStart != null) {
sink.add(_onWindowStart!(event));
sink.addSync(_onWindowStart!(event));
}
}

void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
void resolveWindowEnd(EnhancedEventSink<T> sink,
[bool isControllerClosing = false]) {
if (isControllerClosing &&
_strategy == WindowStrategy.eventAfterLastWindow) {
if (_dispatchOnClose &&
_hasData &&
queue.length > 1 &&
_onWindowEnd != null) {
sink.add(_onWindowEnd!(unmodifiableQueue));
sink.addSync(_onWindowEnd!(unmodifiableQueue));
}

queue.clear();
_windowSubscription?.cancel();
_windowSubscription = null;

sink.close();
sink.closeSync();
return;
}

Expand All @@ -211,7 +213,7 @@ class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {

if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) {
if (_onWindowEnd != null) {
sink.add(_onWindowEnd!(unmodifiableQueue));
sink.addSync(_onWindowEnd!(unmodifiableQueue));
}

// prepare the buffer for the next window.
Expand Down
8 changes: 4 additions & 4 deletions packages/rxdart/lib/src/transformers/delay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ class _DelayStreamSink<S> extends ForwardingSink<S, S> {
final subscription = Rx.timer<void>(null, _duration).listen((_) {
_subscriptions.removeFirst();

sink.add(data);
sink.addSync(data);

if (_inputClosed && _subscriptions.isEmpty) {
sink.close();
sink.closeSync();
}
});

_subscriptions.addLast(subscription);
}

@override
void onError(Object error, StackTrace st) => sink.addError(error, st);
void onError(Object error, StackTrace st) => sink.addErrorSync(error, st);

@override
void onDone() {
_inputClosed = true;

if (_subscriptions.isEmpty) {
sink.close();
sink.closeSync();
}
}

Expand Down
13 changes: 7 additions & 6 deletions packages/rxdart/lib/src/transformers/delay_when.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,30 @@ class _DelayWhenStreamSink<T> extends ForwardingSink<T, T> {

@override
void onData(T data) {
final subscription =
itemDelaySelector(data).take(1).listen(null, onError: sink.addError);
final subscription = itemDelaySelector(data)
.take(1)
.listen(null, onError: sink.addErrorSync);

subscription.onDone(() {
subscriptions.remove(subscription);

sink.add(data);
sink.addSync(data);
if (subscriptions.isEmpty && closed) {
sink.close();
sink.closeSync();
}
});

subscriptions.add(subscription);
}

@override
void onError(Object error, StackTrace st) => sink.addError(error, st);
void onError(Object error, StackTrace st) => sink.addErrorSync(error, st);

@override
void onDone() {
closed = true;
if (subscriptions.isEmpty) {
sink.close();
sink.closeSync();
}
}

Expand Down
24 changes: 12 additions & 12 deletions packages/rxdart/lib/src/transformers/do.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,44 @@ class _DoStreamSink<S> extends ForwardingSink<S, S> {
try {
_onData?.call(data);
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
try {
_onEach?.call(StreamNotification.data(data));
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
sink.add(data);
sink.addSync(data);
}

@override
void onError(Object e, StackTrace st) {
try {
_onError?.call(e, st);
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
try {
_onEach?.call(StreamNotification.error(e, st));
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
sink.addError(e, st);
sink.addErrorSync(e, st);
}

@override
void onDone() {
try {
_onDone?.call();
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
try {
_onEach?.call(StreamNotification.done());
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
sink.close();
sink.closeSync();
}

@override
Expand All @@ -78,7 +78,7 @@ class _DoStreamSink<S> extends ForwardingSink<S, S> {
try {
_onListen?.call();
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
}

Expand All @@ -87,7 +87,7 @@ class _DoStreamSink<S> extends ForwardingSink<S, S> {
try {
_onPause?.call();
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
}

Expand All @@ -96,7 +96,7 @@ class _DoStreamSink<S> extends ForwardingSink<S, S> {
try {
_onResume?.call();
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions packages/rxdart/lib/src/transformers/exhaust_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,31 @@ class _ExhaustMapStreamSink<S, T> extends ForwardingSink<S, T> {
try {
mappedStream = _mapper(data);
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
return;
}

_mapperSubscription = mappedStream.listen(
sink.add,
onError: sink.addError,
sink.addSync,
onError: sink.addErrorSync,
onDone: () {
_mapperSubscription = null;

if (_inputClosed) {
sink.close();
sink.closeSync();
}
},
);
}

@override
void onError(Object e, StackTrace st) => sink.addError(e, st);
void onError(Object e, StackTrace st) => sink.addErrorSync(e, st);

@override
void onDone() {
_inputClosed = true;

_mapperSubscription ?? sink.close();
_mapperSubscription ?? sink.closeSync();
}

@override
Expand Down
11 changes: 6 additions & 5 deletions packages/rxdart/lib/src/transformers/flat_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,33 @@ class _FlatMapStreamSink<S, T> extends ForwardingSink<S, T> {
try {
mappedStream = _mapper(data);
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
return;
}

final subscription = mappedStream.listen(sink.add, onError: sink.addError);
final subscription =
mappedStream.listen(sink.addSync, onError: sink.addErrorSync);
subscription.onDone(() {
_subscriptions.remove(subscription);

if (queue.isNotEmpty) {
listenInner(queue.removeFirst());
} else if (_inputClosed && _subscriptions.isEmpty) {
sink.close();
sink.closeSync();
}
});
_subscriptions.add(subscription);
}

@override
void onError(Object e, StackTrace st) => sink.addError(e, st);
void onError(Object e, StackTrace st) => sink.addErrorSync(e, st);

@override
void onDone() {
_inputClosed = true;

if (_subscriptions.isEmpty) {
sink.close();
sink.closeSync();
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/rxdart/lib/src/transformers/group_by.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class _GroupByStreamSink<T, K> extends ForwardingSink<T, GroupedStream<T, K>> {
);
}

sink.add(groupByStream);
sink.addSync(groupByStream);
return groupedController;
}

Expand All @@ -47,20 +47,20 @@ class _GroupByStreamSink<T, K> extends ForwardingSink<T, GroupedStream<T, K>> {
try {
key = grouper(data);
} catch (e, s) {
sink.addError(e, s);
sink.addErrorSync(e, s);
return;
}

groups.putIfAbsent(key, () => _controllerBuilder(key)).add(data);
}

@override
void onError(e, st) => sink.addError(e, st);
void onError(e, st) => sink.addErrorSync(e, st);

@override
void onDone() {
_closeAll();
sink.close();
sink.closeSync();
}

@override
Expand Down
Loading
Loading