-
Notifications
You must be signed in to change notification settings - Fork 221
fixes exec queue add and pop #7591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/supernova-async-exec
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes issues with the execution queue's Add and Pop operations, addressing concurrency problems and improving error handling in the async execution system.
Key Changes
- Fixed the Pop() method condition from
> 1to>= 1to correctly handle single-item queues - Updated AddOrReplace notification logic to only notify when the first item is added to an empty queue
- Added ValidateQueueIntegrity() method for periodic queue health checks
- Improved error handling with proper error propagation and retry logic with exponential backoff
- Enhanced error handling in baseSync.go using errors.Is() instead of direct comparison
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| process/asyncExecution/queue/blocksQueue.go | Fixed Pop() condition bug, restructured notification logic, added ValidateQueueIntegrity() method, and enhanced logging |
| process/asyncExecution/headersExecutor.go | Added retry mechanism with max attempts and exponential backoff, periodic validation ticker, nil checks for headers/bodies, and fixed error propagation |
| process/sync/baseSync.go | Updated error comparison to use errors.Is() and ensured deferred error handling works correctly |
| process/asyncExecution/queue/errors.go | Removed unused ErrMissingHeaderNonce and added ErrQueueIntegrityViolation |
| process/asyncExecution/errors.go | Added new error types for nil handlers and execution results |
| process/interface.go | Added ValidateQueueIntegrity() to BlocksQueue interface |
| process/asyncExecution/interface.go | Added ValidateQueueIntegrity() to BlocksQueue interface |
| testscommon/processMocks/blocksQueueMock.go | Added ValidateQueueIntegrityCalled field and method to mock |
| process/asyncExecution/queue/blocksQueue_test.go | Added comprehensive test coverage for ValidateQueueIntegrity() and concurrent operations |
| process/asyncExecution/headersExecutor_test.go | Fixed test expectations to properly validate error propagation |
Comments suppressed due to low confidence (1)
process/asyncExecution/headersExecutor.go:143
- This select statement with a default case creates a busy-wait loop when the validation ticker doesn't fire. The default case will be executed continuously, causing high CPU usage even when the queue is empty and Pop() is blocking. Consider removing the default case or adding a small sleep in the default branch to avoid busy-waiting. The ticker and Pop() blocking should provide sufficient responsiveness without the default case.
select {
case <-ctx.Done():
return
case <-validationTicker.C:
// Periodic queue validation
err := he.blocksQueue.ValidateQueueIntegrity()
if err != nil {
log.Error("headersExecutor.start: queue integrity validation failed", "err", err)
}
default:
he.mutPaused.RLock()
isPaused := he.isPaused
he.mutPaused.RUnlock()
if isPaused {
time.Sleep(timeToSleep)
continue
}
// blocking operation
headerBodyPair, ok := he.blocksQueue.Pop()
if !ok {
log.Debug("headersExecutor.start: not ok fetching from queue")
// close event
return
}
if check.IfNil(headerBodyPair.Header) || check.IfNil(headerBodyPair.Body) {
log.Debug("headersExecutor.start: popped nil header or body, continuing...")
continue
}
err := he.process(headerBodyPair)
if err != nil {
he.handleProcessError(ctx, headerBodyPair)
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if err != nil { | ||
| log.Warn("headersExecutor.process process block failed", | ||
| "nonce", pair.Header.GetNonce(), | ||
| "hash", pair.Header.GetPrevHash(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "hash", pair.Header.GetPrevHash(), | |
| "prevHash", pair.Header.GetPrevHash(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| // Validate input parameters | ||
| if check.IfNil(pair.Header) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think its really needed, there are already some nil checks before process call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if check.IfNil(executionResult) { | ||
| log.Warn("headersExecutor.process - nil execution result received", | ||
| "nonce", pair.Header.GetNonce()) | ||
| return ErrNilExecutionResult | ||
| } |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new nil execution result check added in the process method (lines 203-207) lacks test coverage. There is no test case that verifies the behavior when ProcessBlockProposal returns a nil execution result with no error, which would trigger the ErrNilExecutionResult error path.
| retryCount := 0 | ||
| backoffTime := timeToSleepOnError | ||
|
|
||
| for retryCount < maxRetryAttempts { | ||
| pairFromQueue, ok := he.blocksQueue.Peek() | ||
| if ok && pairFromQueue.Header.GetNonce() == pair.Header.GetNonce() { | ||
| // continue the processing (pop the next header from queue) | ||
| return | ||
| } | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
| // retry with the same pair | ||
| err := he.process(pair) | ||
| if err == nil { | ||
| log.Debug("headersExecutor.handleProcessError - retry succeeded", | ||
| "nonce", pair.Header.GetNonce(), | ||
| "retry_count", retryCount) | ||
| return | ||
| } | ||
| time.Sleep(timeToSleepOnError) | ||
| retryCount++ | ||
| log.Warn("headersExecutor.handleProcessError - retry failed", | ||
| "nonce", pair.Header.GetNonce(), | ||
| "retry_count", retryCount, | ||
| "max_retries", maxRetryAttempts, | ||
| "err", err) | ||
|
|
||
| // Exponential backoff with maximum limit | ||
| time.Sleep(backoffTime) | ||
| backoffTime = backoffTime * 2 | ||
| if backoffTime > maxBackoffTime { | ||
| backoffTime = maxBackoffTime | ||
| } | ||
| } | ||
| } | ||
|
|
||
| log.Error("headersExecutor.handleProcessError - max retries exceeded, skipping block", | ||
| "nonce", pair.Header.GetNonce(), | ||
| "max_retries", maxRetryAttempts) |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new retry logic with exponential backoff and max retry limits in handleProcessError lacks comprehensive test coverage. There are no tests verifying that the retry count increments correctly, exponential backoff works as expected, or that max retries are enforced and properly logged.
process/asyncExecution/errors.go
Outdated
| // ErrNilHeaderHandler signals that a nil header handler has been provided | ||
| var ErrNilHeaderHandler = errors.New("nil header handler") | ||
|
|
||
| // ErrNilBlockBody signals that a nil block body has been provided | ||
| var ErrNilBlockBody = errors.New("nil block body") | ||
|
|
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error variables ErrNilHeaderHandler and ErrNilBlockBody are defined but never used in the codebase. The code uses common.ErrNilHeaderHandler and data.ErrNilBlockBody instead (as seen in blocksQueue.go lines 42 and 45). These unused error definitions should be removed to avoid confusion.
| // ErrNilHeaderHandler signals that a nil header handler has been provided | |
| var ErrNilHeaderHandler = errors.New("nil header handler") | |
| // ErrNilBlockBody signals that a nil block body has been provided | |
| var ErrNilBlockBody = errors.New("nil block body") |
| // Clear any stale notification from the channel when we take the fast path | ||
| select { | ||
| case <-bq.notifyCh: | ||
| default: | ||
| } |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearing the notification channel on the fast path (lines 146-149) could lead to a lost wakeup problem. If AddOrReplace sends a notification while Pop is processing (after checking the length but before clearing the channel), that notification will be consumed and discarded, potentially causing another waiting Pop call to miss its wakeup. Consider only clearing the channel when the queue becomes empty after popping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| validationTicker := time.NewTicker(validationInterval) | ||
| defer validationTicker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-validationTicker.C: | ||
| // Periodic queue validation | ||
| err := he.blocksQueue.ValidateQueueIntegrity() | ||
| if err != nil { | ||
| log.Error("headersExecutor.start: queue integrity validation failed", "err", err) | ||
| } |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The periodic validation in the select statement may not execute as expected. The default case will be taken immediately when no other channels are ready, causing the loop to continuously process without giving the validation ticker a chance to fire. Consider removing the default case and using a non-blocking approach or restructuring the select to prioritize validation checks.
The base branch was changed.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## feat/supernova-async-exec #7591 +/- ##
==========================================================
Coverage 77.69% 77.69%
==========================================================
Files 876 876
Lines 121012 121077 +65
==========================================================
+ Hits 94015 94066 +51
- Misses 20790 20805 +15
+ Partials 6207 6206 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Reasoning behind the pull request
Proposed changes
Testing procedure
Pre-requisites
Based on the Contributing Guidelines the PR author and the reviewers must check the following requirements are met:
featbranch created?featbranch merging, do all satellite projects have a proper tag insidego.mod?