@@ -7,6 +7,7 @@ import 'dart:convert';
77import 'package:amplify_api_dart/src/graphql/web_socket/blocs/web_socket_bloc.dart' ;
88import 'package:amplify_api_dart/src/graphql/web_socket/state/web_socket_state.dart' ;
99import 'package:amplify_api_dart/src/graphql/web_socket/types/connectivity_platform.dart' ;
10+ import 'package:amplify_api_dart/src/graphql/web_socket/types/process_life_cycle.dart' ;
1011import 'package:amplify_api_dart/src/graphql/web_socket/types/web_socket_types.dart' ;
1112import 'package:amplify_core/amplify_core.dart' ;
1213import 'package:test/test.dart' ;
@@ -54,6 +55,7 @@ void main() {
5455 if (! noConnectivity) {
5556 mockNetworkStreamController = StreamController <ConnectivityStatus >();
5657 }
58+ mockProcessLifeCycleController = StreamController <ProcessStatus >();
5759 mockPollClient = MockPollClient ();
5860 service = MockWebSocketService ();
5961
@@ -66,6 +68,7 @@ void main() {
6668 connectivity: noConnectivity
6769 ? const ConnectivityPlatform ()
6870 : const MockConnectivity (),
71+ processLifeCycle: const MockProcessLifeCycle (),
6972 );
7073
7174 sendMockConnectionAck (bloc! , service! );
@@ -307,6 +310,143 @@ void main() {
307310 mockPollClient.induceTimeout = false ;
308311 });
309312
313+ test ('should reconnect when process unpauses' , () async {
314+ var dataCompleter = Completer <String >();
315+ final subscribeEvent = SubscribeEvent (
316+ subscriptionRequest,
317+ () {
318+ service! .channel.sink.add (mockDataString);
319+ },
320+ );
321+
322+ final bloc = getWebSocketBloc ();
323+
324+ expect (
325+ bloc.stream,
326+ emitsInOrder (
327+ [
328+ isA <DisconnectedState >(),
329+ isA <ConnectingState >(),
330+ isA <ConnectedState >(),
331+ isA <ReconnectingState >(),
332+ isA <ConnectingState >(),
333+ isA <ConnectedState >(),
334+ ],
335+ ),
336+ );
337+
338+ bloc.subscribe (subscribeEvent).listen (
339+ expectAsync1 (
340+ (event) {
341+ expect (event.data, json.encode (mockSubscriptionData));
342+ dataCompleter.complete (event.data);
343+ },
344+ count: 2 ,
345+ ),
346+ );
347+
348+ await dataCompleter.future;
349+ dataCompleter = Completer <String >();
350+
351+ mockProcessLifeCycleController
352+ ..add (ProcessStatus .paused)
353+ ..add (ProcessStatus .resumed);
354+
355+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
356+
357+ service! .channel.sink.add (mockDataString);
358+ await dataCompleter.future;
359+ });
360+
361+ test ('should throttle reconnect after repeated unpausing' , () async {
362+ final blocReady = Completer <void >();
363+ final subscribeEvent = SubscribeEvent (
364+ subscriptionRequest,
365+ blocReady.complete,
366+ );
367+
368+ final bloc = getWebSocketBloc ();
369+
370+ expect (
371+ bloc.stream,
372+ emitsInOrder (
373+ [
374+ isA <DisconnectedState >(),
375+ isA <ConnectingState >(),
376+ isA <ConnectedState >(),
377+ isA <ReconnectingState >(),
378+ isA <ConnectingState >(),
379+ isA <ConnectedState >(),
380+ ],
381+ ),
382+ reason:
383+ 'Bloc should debounce multiple reconnection triggers while unpausing.' ,
384+ );
385+
386+ bloc.subscribe (
387+ subscribeEvent,
388+ );
389+
390+ await blocReady.future;
391+
392+ mockProcessLifeCycleController
393+ ..add (ProcessStatus .paused)
394+ ..add (ProcessStatus .resumed)
395+ ..add (ProcessStatus .paused)
396+ ..add (ProcessStatus .resumed)
397+ ..add (ProcessStatus .paused)
398+ ..add (ProcessStatus .resumed)
399+ ..add (ProcessStatus .paused)
400+ ..add (ProcessStatus .resumed)
401+ ..add (ProcessStatus .paused)
402+ ..add (ProcessStatus .resumed);
403+ });
404+
405+ test ('should reconnect multiple times after unpausing' , () async {
406+ final blocReady = Completer <void >();
407+ final subscribeEvent = SubscribeEvent (
408+ subscriptionRequest,
409+ blocReady.complete,
410+ );
411+
412+ final bloc = getWebSocketBloc ()
413+ ..subscribe (
414+ subscribeEvent,
415+ );
416+
417+ await blocReady.future;
418+
419+ mockProcessLifeCycleController..add (ProcessStatus .paused)
420+ ..add (ProcessStatus .resumed);
421+
422+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
423+
424+ mockProcessLifeCycleController..add (ProcessStatus .paused)
425+ ..add (ProcessStatus .resumed);
426+
427+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
428+
429+ mockProcessLifeCycleController..add (ProcessStatus .paused)
430+ ..add (ProcessStatus .resumed);
431+
432+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
433+
434+ mockProcessLifeCycleController..add (ProcessStatus .paused)
435+ ..add (ProcessStatus .resumed);
436+
437+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
438+
439+ mockProcessLifeCycleController..add (ProcessStatus .paused)
440+ ..add (ProcessStatus .resumed);
441+
442+ await expectLater (bloc.stream, emitsThrough (isA <ReconnectingState >()));
443+
444+ mockProcessLifeCycleController..add (ProcessStatus .paused)
445+ ..add (ProcessStatus .resumed);
446+
447+ await expectLater (bloc.stream, emitsThrough (isA <ConnectedState >()));
448+ });
449+
310450 test (
311451 'subscribe() ignores a WebSocket message that comes while the bloc is disconnected' ,
312452 () async {
@@ -348,13 +488,15 @@ void main() {
348488
349489 final badService = MockWebSocketService (badInit: true );
350490 mockNetworkStreamController = StreamController <ConnectivityStatus >();
491+ mockProcessLifeCycleController = StreamController <ProcessStatus >();
351492 final bloc = WebSocketBloc (
352493 config: testApiKeyConfig,
353494 authProviderRepo: getTestAuthProviderRepo (),
354495 wsService: badService,
355496 subscriptionOptions: subscriptionOptions,
356497 pollClientOverride: mockPollClient.client,
357498 connectivity: const MockConnectivity (),
499+ processLifeCycle: const MockProcessLifeCycle (),
358500 );
359501
360502 expect (
0 commit comments