3434public class EventSync
3535{
3636 public static final long BLOCK_SEARCH_INTERVAL = 100000L ;
37+ public static final long POLYGON_BLOCK_SEARCH_INTERVAL = 10000L ;
38+
39+ private static final String TAG = "EVENT_SYNC" ;
40+ private static final boolean EVENT_SYNC_DEBUGGING = false ;
3741
3842 private final Token token ;
3943
@@ -71,8 +75,17 @@ public SyncDef getSyncDef(Realm realm)
7175 case DOWNWARD_SYNC_START : //Start event sync, optimistically try the whole current event range from 1 -> LATEST
7276 eventReadStartBlock = BigInteger .ONE ;
7377 eventReadEndBlock = BigInteger .valueOf (-1L );
74- //write the start point here
75- writeStartSyncBlock (realm , currentBlock .longValue ());
78+ if (EthereumNetworkBase .isEventBlockLimitEnforced (token .tokenInfo .chainId ))
79+ {
80+ syncState = EventSyncState .UPWARD_SYNC ;
81+ eventReadStartBlock = currentBlock .subtract (EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId ).multiply (BigInteger .valueOf (3 )));
82+ EVENT_DEBUG ("Init Sync for restricted block RPC" );
83+ }
84+ else
85+ {
86+ //write the start point here
87+ writeStartSyncBlock (realm , currentBlock .longValue ());
88+ }
7689 break ;
7790 case DOWNWARD_SYNC : //we needed to slow down the sync
7891 eventReadStartBlock = lastBlockRead .subtract (BigInteger .valueOf (readBlockSize ));
@@ -85,19 +98,85 @@ public SyncDef getSyncDef(Realm realm)
8598 break ;
8699 case UPWARD_SYNC_MAX : //we are syncing from the point we started the downward sync
87100 upwardSync = true ;
101+ if (EthereumNetworkBase .isEventBlockLimitEnforced (token .tokenInfo .chainId ) && upwardSyncStateLost (lastBlockRead , currentBlock ))
102+ {
103+ syncState = EventSyncState .UPWARD_SYNC ;
104+ EVENT_DEBUG ("Switch back to sync scan" );
105+ }
106+
88107 eventReadStartBlock = lastBlockRead ;
89108 eventReadEndBlock = BigInteger .valueOf (-1L );
90109 break ;
91110 case UPWARD_SYNC : //we encountered upward sync issues
92111 upwardSync = true ;
93112 eventReadStartBlock = lastBlockRead ;
94- eventReadEndBlock = lastBlockRead .add (BigInteger .valueOf (readBlockSize ));
113+ if (upwardSyncComplete (eventReadStartBlock , currentBlock )) //detect completion of upward sync and switch to sync_max
114+ {
115+ eventReadEndBlock = BigInteger .valueOf (-1L );
116+ syncState = EventSyncState .UPWARD_SYNC_MAX ;
117+ EVENT_DEBUG ("Sync complete" );
118+ }
119+ else
120+ {
121+ eventReadEndBlock = lastBlockRead .add (BigInteger .valueOf (readBlockSize ));
122+ }
95123 break ;
96124 }
97125
126+ // Finally adjust the event end read if required. This is placed outside the switch because it should affect
127+ // a few different paths
128+ eventReadEndBlock = adjustForLimitedBlockSize (eventReadStartBlock , eventReadEndBlock , currentBlock );
129+
130+ // detect edge condition - it's highly unlikely but acts as a stopper in case of unexpected results
131+ // This edge condition is where the start block read is greater than the current block.
132+ if (eventReadStartBlock .compareTo (currentBlock ) >= 0 )
133+ {
134+ eventReadStartBlock = currentBlock .subtract (BigInteger .ONE );
135+ eventReadEndBlock = BigInteger .valueOf (-1L );
136+ syncState = EventSyncState .UPWARD_SYNC_MAX ;
137+ }
138+
98139 return new SyncDef (eventReadStartBlock , eventReadEndBlock , syncState , upwardSync );
99140 }
100141
142+ private void EVENT_DEBUG (String message )
143+ {
144+ if (EVENT_SYNC_DEBUGGING )
145+ {
146+ Timber .tag (TAG ).i (token .tokenInfo .chainId + " " + token .tokenInfo .address + ": " + message );
147+ }
148+ }
149+
150+ private boolean upwardSyncStateLost (BigInteger lastBlockRead , BigInteger currentBlock )
151+ {
152+ return currentBlock .subtract (lastBlockRead ).compareTo (EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId )) >= 0 ;
153+ }
154+
155+ private boolean upwardSyncComplete (BigInteger eventReadStartBlock , BigInteger currentBlock )
156+ {
157+ BigInteger maxBlockRead = EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId ).subtract (BigInteger .ONE );
158+ BigInteger diff = currentBlock .subtract (eventReadStartBlock );
159+
160+ return diff .compareTo (maxBlockRead ) < 0 ;
161+ }
162+
163+ private BigInteger adjustForLimitedBlockSize (BigInteger eventReadStartBlock , BigInteger eventReadEndBlock , BigInteger currentBlock )
164+ {
165+ if (EthereumNetworkBase .isEventBlockLimitEnforced (token .tokenInfo .chainId ))
166+ {
167+ BigInteger maxBlockRead = EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId );
168+
169+ long diff = currentBlock .subtract (eventReadStartBlock ).longValue ();
170+
171+ if (diff >= maxBlockRead .longValue ())
172+ {
173+ return eventReadStartBlock .add (maxBlockRead ).subtract (BigInteger .ONE );
174+ }
175+ }
176+
177+ return eventReadEndBlock ;
178+ }
179+
101180 public boolean handleEthLogError (Response .Error error , DefaultBlockParameter startBlock , DefaultBlockParameter endBlock , SyncDef sync , Realm realm )
102181 {
103182 if (error .getCode () == -32005 )
@@ -177,6 +256,11 @@ private long reduceBlockSearch(long currentBlock, BigInteger startBlock)
177256
178257 private long getCurrentEventBlockSize (Realm instance )
179258 {
259+ if (EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId ).equals (BigInteger .valueOf (3500L )))
260+ {
261+ return EthereumNetworkBase .getMaxEventFetch (token .tokenInfo .chainId ).longValue ();
262+ }
263+
180264 RealmAuxData rd = instance .where (RealmAuxData .class )
181265 .equalTo ("instanceKey" , TokensRealmSource .databaseKey (token .tokenInfo .chainId , token .getAddress ()))
182266 .findFirst ();
@@ -205,8 +289,9 @@ protected EventSyncState getCurrentTokenSyncState(Realm instance)
205289 else
206290 {
207291 int state = rd .getTokenId ().intValue ();
208- if (state >= EventSyncState .DOWNWARD_SYNC_START .ordinal () || state < EventSyncState .TOP_LIMIT .ordinal ())
292+ if (state >= EventSyncState .DOWNWARD_SYNC_START .ordinal () && state < EventSyncState .TOP_LIMIT .ordinal ())
209293 {
294+ EVENT_DEBUG ("Read State: " + EventSyncState .values ()[state ]);
210295 return EventSyncState .values ()[state ];
211296 }
212297 else
@@ -248,6 +333,7 @@ protected long getLastEventRead(Realm instance)
248333 }
249334 else
250335 {
336+ EVENT_DEBUG ("ReadEventSync: " + rd .getResultTime ());
251337 return rd .getResultTime ();
252338 }
253339 }
@@ -333,14 +419,16 @@ private void updateEventReads(Realm realm, long lastRead, long readInterval, Eve
333419 rd .setResultReceivedTime (readInterval );
334420 rd .setTokenId (String .valueOf (state .ordinal ()));
335421
422+ EVENT_DEBUG ("WriteState: " + state + " " + lastRead );
423+
336424 r .insertOrUpdate (rd );
337425 });
338426 }
339427
340428 // If we're syncing downwards, work out what event block size we should read next
341429 private long calcNewIntervalSize (SyncDef sync , int evReads )
342430 {
343- if (sync .upwardSync ) return BLOCK_SEARCH_INTERVAL ;
431+ if (sync .upwardSync ) return EthereumNetworkBase . getMaxEventFetch ( token . tokenInfo . chainId ). longValue () ;
344432 long endBlock = sync .eventReadEndBlock .longValue () == -1 ? TransactionsService .getCurrentBlock (token .tokenInfo .chainId ).longValue ()
345433 : sync .eventReadEndBlock .longValue ();
346434 long currentReadSize = endBlock - sync .eventReadStartBlock .longValue ();
@@ -357,14 +445,16 @@ else if (evReads < 1000)
357445 }
358446 else if ((maxLogReads - evReads ) > maxLogReads *0.25 )
359447 {
360- currentReadSize += BLOCK_SEARCH_INTERVAL ;
448+ currentReadSize += EthereumNetworkBase . getMaxEventFetch ( token . tokenInfo . chainId ). longValue () ;
361449 }
362450
363451 return currentReadSize ;
364452 }
365453
366454 /***
367455 * Event Handling
456+ *
457+ * TODO: batch up catch-up calls
368458 */
369459
370460 public Pair <Integer , Pair <HashSet <BigInteger >, HashSet <BigInteger >>> processTransferEvents (Web3j web3j , Event transferEvent , DefaultBlockParameter startBlock ,
0 commit comments