33import io .kurrent .dbclient .*;
44import com .fasterxml .jackson .core .JsonProcessingException ;
55import com .fasterxml .jackson .databind .ObjectMapper ;
6+ import org .reactivestreams .*;
7+ import org .reactivestreams .Subscription ;
68
79import java .util .concurrent .ExecutionException ;
810
11+ @ SuppressWarnings ("ALL" )
912public class ReadingEvents {
1013 private static void readFromStream (KurrentDBClient client ) throws ExecutionException , InterruptedException , JsonProcessingException {
1114 // region read-from-stream
@@ -16,13 +19,41 @@ private static void readFromStream(KurrentDBClient client) throws ExecutionExcep
1619 ReadResult result = client .readStream ("some-stream" , options )
1720 .get ();
1821
22+
23+ // or using read reactive
24+ Publisher <ReadMessage > publisher = client .readStreamReactive ("some-stream" , options );
1925 // endregion read-from-stream
2026
2127 // region iterate-stream
2228 for (ResolvedEvent resolvedEvent : result .getEvents ()) {
2329 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
2430 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
2531 }
32+
33+ // or using read reactive
34+ publisher .subscribe (new Subscriber <ReadMessage >() {
35+ @ Override
36+ public void onSubscribe (Subscription subscription ) {
37+ }
38+
39+ @ Override
40+ public void onNext (ReadMessage readMessage ) {
41+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
42+ try {
43+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
44+ } catch (JsonProcessingException e ) {
45+ throw new RuntimeException (e );
46+ }
47+ }
48+
49+ @ Override
50+ public void onError (Throwable throwable ) {
51+ }
52+
53+ @ Override
54+ public void onComplete () {
55+ }
56+ });
2657 // endregion iterate-stream
2758 }
2859
@@ -36,13 +67,40 @@ private static void readFromStreamPosition(KurrentDBClient client) throws Execut
3667 ReadResult result = client .readStream ("some-stream" , options )
3768 .get ();
3869
70+ // or using read reactive
71+ Publisher <ReadMessage > publisher = client .readStreamReactive ("some-stream" , options );
3972 // endregion read-from-stream-position
4073
4174 // region iterate-stream
4275 for (ResolvedEvent resolvedEvent : result .getEvents ()) {
4376 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
4477 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
4578 }
79+
80+ // or using read reactive
81+ publisher .subscribe (new Subscriber <ReadMessage >() {
82+ @ Override
83+ public void onSubscribe (Subscription subscription ) {
84+ }
85+
86+ @ Override
87+ public void onNext (ReadMessage readMessage ) {
88+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
89+ try {
90+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
91+ } catch (JsonProcessingException e ) {
92+ throw new RuntimeException (e );
93+ }
94+ }
95+
96+ @ Override
97+ public void onError (Throwable throwable ) {
98+ }
99+
100+ @ Override
101+ public void onComplete () {
102+ }
103+ });
46104 // endregion iterate-stream
47105 }
48106
@@ -55,6 +113,9 @@ private static void readStreamOverridingUserCredentials(KurrentDBClient client)
55113
56114 ReadResult result = client .readStream ("some-stream" , options )
57115 .get ();
116+
117+ // Or using reactive stream
118+ Publisher <ReadMessage > publisher = client .readStreamReactive ("some-stream" , options );
58119 // endregion overriding-user-credentials
59120 }
60121
@@ -81,6 +142,39 @@ private static void readFromStreamPositionCheck(KurrentDBClient client) throws J
81142 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
82143 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
83144 }
145+
146+ // or using read reactive
147+ Publisher <ReadMessage > publisher = client .readStreamReactive ("some-stream" , options );
148+
149+ publisher .subscribe (new Subscriber <ReadMessage >() {
150+ @ Override
151+ public void onSubscribe (Subscription subscription ) {
152+ }
153+
154+ @ Override
155+ public void onNext (ReadMessage readMessage ) {
156+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
157+ try {
158+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
159+ } catch (JsonProcessingException e ) {
160+ throw new RuntimeException (e );
161+ }
162+ }
163+
164+ @ Override
165+ public void onError (Throwable throwable ) {
166+ Throwable innerException = throwable .getCause ();
167+
168+ if (innerException instanceof StreamNotFoundException ) {
169+ return ;
170+ }
171+ // Handle other errors
172+ }
173+
174+ @ Override
175+ public void onComplete () {
176+ }
177+ });
84178 // endregion checking-for-stream-presence
85179 }
86180
@@ -97,6 +191,33 @@ private static void readFromStreamBackwards(KurrentDBClient client) throws JsonP
97191 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
98192 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
99193 }
194+
195+ // or using read reactive
196+ Publisher <ReadMessage > publisher = client .readStreamReactive ("some-stream" , options );
197+
198+ publisher .subscribe (new Subscriber <ReadMessage >() {
199+ @ Override
200+ public void onSubscribe (Subscription subscription ) {
201+ }
202+
203+ @ Override
204+ public void onNext (ReadMessage readMessage ) {
205+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
206+ try {
207+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
208+ } catch (JsonProcessingException e ) {
209+ throw new RuntimeException (e );
210+ }
211+ }
212+
213+ @ Override
214+ public void onError (Throwable throwable ) {
215+ }
216+
217+ @ Override
218+ public void onComplete () {
219+ }
220+ });
100221 // endregion reading-backwards
101222 }
102223
@@ -109,13 +230,40 @@ private static void readFromAllStream(KurrentDBClient client) throws JsonProcess
109230 ReadResult result = client .readAll (options )
110231 .get ();
111232
233+ // or using read reactive
234+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
112235 // endregion read-from-all-stream
113236
114237 // region read-from-all-stream-iterate
115238 for (ResolvedEvent resolvedEvent : result .getEvents ()) {
116239 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
117240 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
118241 }
242+
243+ // or using read reactive
244+ publisher .subscribe (new Subscriber <ReadMessage >() {
245+ @ Override
246+ public void onSubscribe (Subscription subscription ) {
247+ }
248+
249+ @ Override
250+ public void onNext (ReadMessage readMessage ) {
251+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
252+ try {
253+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
254+ } catch (JsonProcessingException e ) {
255+ throw new RuntimeException (e );
256+ }
257+ }
258+
259+ @ Override
260+ public void onError (Throwable throwable ) {
261+ }
262+
263+ @ Override
264+ public void onComplete () {
265+ }
266+ });
119267 // endregion read-from-all-stream-iterate
120268 }
121269
@@ -128,6 +276,9 @@ private static void readAllOverridingUserCredentials(KurrentDBClient client) thr
128276
129277 ReadResult result = client .readAll (options )
130278 .get ();
279+
280+ // or using read reactive
281+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
131282 // endregion read-all-overriding-user-credentials
132283 }
133284
@@ -147,6 +298,38 @@ private static void ignoreSystemEvents(KurrentDBClient client) throws JsonProces
147298 }
148299 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
149300 }
301+
302+ // or using read reactive
303+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
304+
305+ publisher .subscribe (new Subscriber <ReadMessage >() {
306+ @ Override
307+ public void onSubscribe (Subscription subscription ) {
308+ }
309+
310+ @ Override
311+ public void onNext (ReadMessage readMessage ) {
312+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
313+
314+ if (recordedEvent .getEventType ().startsWith ("$" )) {
315+ return ;
316+ }
317+
318+ try {
319+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
320+ } catch (JsonProcessingException e ) {
321+ throw new RuntimeException (e );
322+ }
323+ }
324+
325+ @ Override
326+ public void onError (Throwable throwable ) {
327+ }
328+
329+ @ Override
330+ public void onComplete () {
331+ }
332+ });
150333 // endregion ignore-system-events
151334 }
152335
@@ -159,13 +342,40 @@ private static void readFromAllStreamBackwards(KurrentDBClient client) throws Js
159342 ReadResult result = client .readAll (options )
160343 .get ();
161344
345+ // or using read reactive
346+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
162347 // endregion read-from-all-stream-backwards
163348
164349 // region read-from-all-stream-iterate
165350 for (ResolvedEvent resolvedEvent : result .getEvents ()) {
166351 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
167352 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
168353 }
354+
355+ // or using read reactive
356+ publisher .subscribe (new Subscriber <ReadMessage >() {
357+ @ Override
358+ public void onSubscribe (Subscription subscription ) {
359+ }
360+
361+ @ Override
362+ public void onNext (ReadMessage readMessage ) {
363+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
364+ try {
365+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
366+ } catch (JsonProcessingException e ) {
367+ throw new RuntimeException (e );
368+ }
369+ }
370+
371+ @ Override
372+ public void onError (Throwable throwable ) {
373+ }
374+
375+ @ Override
376+ public void onComplete () {
377+ }
378+ });
169379 // endregion read-from-all-stream-iterate
170380 }
171381
@@ -184,6 +394,36 @@ private static void filteringOutSystemEvents(KurrentDBClient client) throws Json
184394 }
185395 System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
186396 }
397+
398+ // or using read reactive
399+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
400+
401+ publisher .subscribe (new Subscriber <ReadMessage >() {
402+ @ Override
403+ public void onSubscribe (Subscription subscription ) {
404+ }
405+
406+ @ Override
407+ public void onNext (ReadMessage readMessage ) {
408+ RecordedEvent recordedEvent = readMessage .getEvent ().getOriginalEvent ();
409+ if (!recordedEvent .getEventType ().startsWith ("$" )) {
410+ return ;
411+ }
412+ try {
413+ System .out .println (new ObjectMapper ().writeValueAsString (recordedEvent .getEventData ()));
414+ } catch (JsonProcessingException e ) {
415+ throw new RuntimeException (e );
416+ }
417+ }
418+
419+ @ Override
420+ public void onError (Throwable throwable ) {
421+ }
422+
423+ @ Override
424+ public void onComplete () {
425+ }
426+ });
187427 }
188428
189429 private static void readFromStreamResolvingLinkTos (KurrentDBClient client ) throws JsonProcessingException , ExecutionException , InterruptedException {
@@ -196,6 +436,9 @@ private static void readFromStreamResolvingLinkTos(KurrentDBClient client) throw
196436 ReadResult result = client .readAll (options )
197437 .get ();
198438
439+ // or using read reactive
440+ Publisher <ReadMessage > publisher = client .readAllReactive (options );
441+
199442 // endregion read-from-all-stream-resolving-link-Tos
200443 for (ResolvedEvent resolvedEvent : result .getEvents ()) {
201444 RecordedEvent recordedEvent = resolvedEvent .getOriginalEvent ();
0 commit comments