From f25ab7dc69a6be059342380eb5d1e6605ebcf482 Mon Sep 17 00:00:00 2001 From: Federico Alterio Date: Sun, 24 Aug 2025 14:38:09 +0200 Subject: [PATCH 1/5] AsyncObseravableBase: normalized synchronous exceptions thrown by observer methods --- .../AsyncObservableBase.cs | 7 +-- .../AsyncObserverEnsureAsyncHelpers.cs | 44 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index 08d0a62a14..dd0766e9ab 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Reactive.Internal; using System.Threading.Tasks; namespace System.Reactive @@ -124,7 +125,7 @@ protected override async ValueTask OnCompletedAsyncCore() return; } - _task = _observer.OnCompletedAsync(); + _task = _observer.OnCompletedAsync_EnsureAsync(); } try @@ -146,7 +147,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) return; } - _task = _observer.OnErrorAsync(error); + _task = _observer.OnErrorAsync_EnsureAsync(error); } try @@ -168,7 +169,7 @@ protected override async ValueTask OnNextAsyncCore(T value) return; } - _task = _observer.OnNextAsync(value); + _task = _observer.OnNextAsync_EnsureAsync(value); } try diff --git a/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs b/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs new file mode 100644 index 0000000000..62a5d82fc6 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs @@ -0,0 +1,44 @@ +using System.Threading.Tasks; + +namespace System.Reactive.Internal; + +// Helpers methods that ensure that calls to IAsyncObserver methods don't throw synchronously. +// Those methods will always return a ValueTask, and any exception will be propagated through that ValueTask. +internal static class AsyncObserverEnsureAsyncHelpers +{ + public static ValueTask OnNextAsync_EnsureAsync(this IAsyncObserver source, T value) + { + try + { + return source.OnNextAsync(value); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } + + public static ValueTask OnErrorAsync_EnsureAsync(this IAsyncObserver source, Exception error) + { + try + { + return source.OnErrorAsync(error); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } + + public static ValueTask OnCompletedAsync_EnsureAsync(this IAsyncObserver source) + { + try + { + return source.OnCompletedAsync(); + } + catch (Exception e) + { + return new ValueTask(Task.FromException(e)); + } + } +} From 19298617749cb0b6822cd1188e35b2d50d52cd86 Mon Sep 17 00:00:00 2001 From: Federico Alterio <48481385+fedeAlterio@users.noreply.github.com> Date: Sun, 19 Oct 2025 10:52:39 +0200 Subject: [PATCH 2/5] Fix reentrant DisposeAsync --- .../AsyncObservableBase.cs | 122 +++++++----------- 1 file changed, 47 insertions(+), 75 deletions(-) diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index dd0766e9ab..99c0a4c028 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -2,7 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Reactive.Internal; +using System.Threading; using System.Threading.Tasks; namespace System.Reactive @@ -28,10 +28,11 @@ public async ValueTask SubscribeAsync(IAsyncObserver observ private sealed class AutoDetachAsyncObserver : AsyncObserverBase, IAsyncDisposable { private readonly IAsyncObserver _observer; + private TaskCompletionSource _pendingOnSomethingCallsTcs; + private readonly AsyncLocal _reentrancyFlag = new(); // If any On* method, calls OnDisposeAsync, this will be true private readonly object _gate = new(); private IAsyncDisposable _subscription; - private ValueTask _task; private bool _disposing; public AutoDetachAsyncObserver(IAsyncObserver observer) @@ -63,61 +64,12 @@ public async ValueTask AssignAsync(IAsyncDisposable subscription) public async ValueTask DisposeAsync() { - ValueTask task; - var subscription = default(IAsyncDisposable); - - lock (_gate) - { - // - // NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message - // processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved - // here by setting _disposing to true, which is checked by the On*AsyncCore calls upon - // entry, and by awaiting the task of any in-flight On*AsyncCore calls. - // - // Timing of the disposal of the subscription is less deterministic due to the intersection - // with the AssignAsync code path. However, the auto-detach observer can only be returned - // from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so - // either AssignAsync triggers the disposal and an already disposed instance is returned, or - // the user calling DisposeAsync will either encounter a busy observer which will be stopped - // in its tracks (as described above) or it will trigger a disposal of the subscription. In - // both these cases the result of awaiting DisposeAsync guarantees no further message flow. - // - - if (!_disposing) - { - _disposing = true; - - task = _task; - subscription = _subscription; - } - } - - try - { - // - // BUGBUG: This causes grief when an outgoing On*Async call reenters the DisposeAsync method and - // results in the task returned from the On*Async call to be awaited to serialize the - // call to subscription.DisposeAsync after it's done. We need to either detect reentrancy - // and queue up the call to DisposeAsync or follow an when we trigger the disposal without - // awaiting outstanding work (thus allowing for concurrency). - // - // if (task != null) - // { - // await task.ConfigureAwait(false); - // } - // - } - finally - { - if (subscription != null) - { - await subscription.DisposeAsync().ConfigureAwait(false); - } - } + await FinishAsync().ConfigureAwait(false); } protected override async ValueTask OnCompletedAsyncCore() { + ValueTask task; lock (_gate) { if (_disposing) @@ -125,12 +77,12 @@ protected override async ValueTask OnCompletedAsyncCore() return; } - _task = _observer.OnCompletedAsync_EnsureAsync(); + task = WithReentrancyFlagOn(static (@this, _) => @this._observer.OnCompletedAsync(), (object)null); } try { - await _task.ConfigureAwait(false); + await task.ConfigureAwait(false); } finally { @@ -140,6 +92,7 @@ protected override async ValueTask OnCompletedAsyncCore() protected override async ValueTask OnErrorAsyncCore(Exception error) { + ValueTask task; lock (_gate) { if (_disposing) @@ -147,12 +100,12 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) return; } - _task = _observer.OnErrorAsync_EnsureAsync(error); + task = WithReentrancyFlagOn(static (@this, error) => @this._observer.OnErrorAsync(error), error); } try { - await _task.ConfigureAwait(false); + await task; } finally { @@ -162,6 +115,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) protected override async ValueTask OnNextAsyncCore(T value) { + ValueTask task; lock (_gate) { if (_disposing) @@ -169,36 +123,37 @@ protected override async ValueTask OnNextAsyncCore(T value) return; } - _task = _observer.OnNextAsync_EnsureAsync(value); + task = WithReentrancyFlagOn(static (@this, value) => @this._observer.OnNextAsync(value), value); } - try - { - await _task.ConfigureAwait(false); - } - finally - { - lock (_gate) - { - _task = default; - } - } + await task.ConfigureAwait(false); } private async ValueTask FinishAsync() { - var subscription = default(IAsyncDisposable); + // On synchronous Rx, if Dispose is called while we're in the middle of an OnNext/OnError/OnCompleted, + // we immediately execute the Dispose() method. + // So it's possible that the On* method finishes after the Dispose() method has completed. + // What it's impossible is that another On* method STARTS AFTER Dispose() has completed. + + Task onSomethingCall; + IAsyncDisposable subscription; lock (_gate) { - if (!_disposing) + if (_disposing) { - _disposing = true; - - subscription = _subscription; + return; } - _task = default; + _disposing = true; + subscription = _subscription; + onSomethingCall = _reentrancyFlag.Value ? null : _pendingOnSomethingCallsTcs?.Task; + } + + if (onSomethingCall != null) + { + await _pendingOnSomethingCallsTcs.Task.ConfigureAwait(false); } if (subscription != null) @@ -206,6 +161,23 @@ private async ValueTask FinishAsync() await subscription.DisposeAsync().ConfigureAwait(false); } } + + private async ValueTask WithReentrancyFlagOn(Func asyncAction, TState state) + { + var runningMethod = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _pendingOnSomethingCallsTcs = runningMethod; + _reentrancyFlag.Value = true; + try + { + await asyncAction(this, state).ConfigureAwait(false); + } + finally + { + _reentrancyFlag.Value = false; + _pendingOnSomethingCallsTcs = null; + runningMethod.SetResult(null!); + } + } } } } From 89dbc87c9958c2b32ccbeb9e9f49d6687766554d Mon Sep 17 00:00:00 2001 From: Federico Alterio <48481385+fedeAlterio@users.noreply.github.com> Date: Sun, 19 Oct 2025 10:54:27 +0200 Subject: [PATCH 3/5] removed unused class --- .../AsyncObserverEnsureAsyncHelpers.cs | 44 ------------------- 1 file changed, 44 deletions(-) delete mode 100644 AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs diff --git a/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs b/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs deleted file mode 100644 index 62a5d82fc6..0000000000 --- a/AsyncRx.NET/System.Reactive.Async/Internal/AsyncObserverEnsureAsyncHelpers.cs +++ /dev/null @@ -1,44 +0,0 @@ -using System.Threading.Tasks; - -namespace System.Reactive.Internal; - -// Helpers methods that ensure that calls to IAsyncObserver methods don't throw synchronously. -// Those methods will always return a ValueTask, and any exception will be propagated through that ValueTask. -internal static class AsyncObserverEnsureAsyncHelpers -{ - public static ValueTask OnNextAsync_EnsureAsync(this IAsyncObserver source, T value) - { - try - { - return source.OnNextAsync(value); - } - catch (Exception e) - { - return new ValueTask(Task.FromException(e)); - } - } - - public static ValueTask OnErrorAsync_EnsureAsync(this IAsyncObserver source, Exception error) - { - try - { - return source.OnErrorAsync(error); - } - catch (Exception e) - { - return new ValueTask(Task.FromException(e)); - } - } - - public static ValueTask OnCompletedAsync_EnsureAsync(this IAsyncObserver source) - { - try - { - return source.OnCompletedAsync(); - } - catch (Exception e) - { - return new ValueTask(Task.FromException(e)); - } - } -} From 092ed23ec492cfc6dfa81cb5781e536e2c0ceab2 Mon Sep 17 00:00:00 2001 From: Federico Alterio <48481385+fedeAlterio@users.noreply.github.com> Date: Sun, 19 Oct 2025 11:03:39 +0200 Subject: [PATCH 4/5] added a ConfigureAwait(false) --- AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index 99c0a4c028..2a2aa3c1dd 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -105,7 +105,7 @@ protected override async ValueTask OnErrorAsyncCore(Exception error) try { - await task; + await task.ConfigureAwait(false); } finally { From 585acc0343ffeab1c4da2ce99eed87e13be70e1c Mon Sep 17 00:00:00 2001 From: Federico Alterio <48481385+fedeAlterio@users.noreply.github.com> Date: Sun, 19 Oct 2025 11:44:36 +0200 Subject: [PATCH 5/5] used local variable instead class field --- AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs index 2a2aa3c1dd..f75c896951 100644 --- a/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/AsyncObservableBase.cs @@ -153,7 +153,7 @@ private async ValueTask FinishAsync() if (onSomethingCall != null) { - await _pendingOnSomethingCallsTcs.Task.ConfigureAwait(false); + await onSomethingCall.ConfigureAwait(false); } if (subscription != null)