Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/githubreconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, url string) error {
resetTime := rateLimitErr.Rate.Reset.Time
clog.FromContext(ctx).With("reset_at", resetTime).
Warn("Rate limited, requeueing after rate limit reset")
return workqueue.RequeueAfter(time.Until(resetTime))
return workqueue.RetryAfter(time.Until(resetTime))
}

// Check if it's an abuse rate limit error
Expand All @@ -164,7 +164,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, url string) error {
}
clog.FromContext(ctx).With("retry_after", retryAfter).
Warn("Abuse rate limit detected, requeueing after retry period")
return workqueue.RequeueAfter(retryAfter)
return workqueue.RetryAfter(retryAfter)
}
}
return err
Expand All @@ -183,8 +183,12 @@ func (r *Reconciler) Process(ctx context.Context, req *workqueue.ProcessRequest)
err := r.Reconcile(ctx, req.Key)
if err != nil {
// Check if we can extract a requeue delay from the error
if delay, ok := workqueue.GetRequeueDelay(err); ok {
clog.InfoContextf(ctx, "Reconciliation requested requeue after %v for key: %s", delay, req.Key)
if delay, ok, isError := workqueue.GetRequeueDelay(err); ok {
if isError {
clog.WarnContextf(ctx, "Reconciliation requested requeue after %v due to error for key: %s", delay, req.Key)
} else {
clog.InfoContextf(ctx, "Reconciliation requested requeue after %v for polling key: %s", delay, req.Key)
}
return &workqueue.ProcessResponse{
RequeueAfterSeconds: int64(delay.Seconds()),
}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/githubreconciler/tokensource.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (ts *tokenSource) Token() (*oauth2.Token, error) {
scope = ts.org + "/" + ts.repo
}
clog.ErrorContextf(ctx, "Got NotFound error from Octo STS for %q: %v", scope, err)
return nil, workqueue.RequeueAfter(10 * time.Minute)
return nil, workqueue.RetryAfter(10 * time.Minute)
}
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/githubreconciler/tokensource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,15 @@ func TestTokenSource_Token_NotFoundError(t *testing.T) {
}

// Check that error is a requeue error with the correct delay
delay, ok := workqueue.GetRequeueDelay(err)
delay, ok, isError := workqueue.GetRequeueDelay(err)
if !ok {
t.Errorf("error type: got non-requeue error, wanted requeue error")
} else if delay != 10*time.Minute {
t.Errorf("requeue duration: got = %v, wanted = %v", delay, 10*time.Minute)
}
if !isError {
t.Error("expected isError = true for RetryAfter, got false")
}
})
}
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/workqueue/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ func HandleAsync(ctx context.Context, wq workqueue.Interface, concurrency int, f
Priority: oip.Priority(),
}); err != nil {
// Check if this is a requeue error with custom delay
if delay, ok := workqueue.GetRequeueDelay(err); ok {
clog.InfoContextf(ctx, "Key %q requested requeue after %v", oip.Name(), delay)
if delay, ok, isError := workqueue.GetRequeueDelay(err); ok {
if isError {
clog.WarnContextf(ctx, "Key %q requested retry with backoff after %v due to error", oip.Name(), delay)
} else {
clog.InfoContextf(ctx, "Key %q requested requeue after %v for polling", oip.Name(), delay)
}
if err := oip.RequeueWithOptions(ctx, workqueue.Options{
Priority: oip.Priority(),
Delay: delay,
Priority: oip.Priority(),
Delay: delay,
PreserveAttempts: isError, // Preserve attempts for error retries, reset for polling
}); err != nil {
return fmt.Errorf("requeue(after delay request) = %w", err)
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/workqueue/dispatcher/requeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,37 @@ func TestRequeueWithDelay(t *testing.T) {
wantCompleted: true,
},
{
name: "requeue with 5 second delay",
name: "requeue with 5 second delay (polling)",
callback: func(_ context.Context, _ string, _ workqueue.Options) error {
return workqueue.RequeueAfter(5 * time.Second)
},
wantRequeued: true,
wantMinDelay: 5 * time.Second,
},
{
name: "requeue with 1 minute delay",
name: "requeue with 1 minute delay (polling)",
callback: func(_ context.Context, _ string, _ workqueue.Options) error {
return workqueue.RequeueAfter(time.Minute)
},
wantRequeued: true,
wantMinDelay: time.Minute,
},
{
name: "retry after 10 second delay (error)",
callback: func(_ context.Context, _ string, _ workqueue.Options) error {
return workqueue.RetryAfter(10 * time.Second)
},
wantRequeued: true,
wantMinDelay: 10 * time.Second,
},
{
name: "retry after 2 minute delay (error)",
callback: func(_ context.Context, _ string, _ workqueue.Options) error {
return workqueue.RetryAfter(2 * time.Minute)
},
wantRequeued: true,
wantMinDelay: 2 * time.Minute,
},
{
name: "non-retriable error",
callback: func(_ context.Context, _ string, _ workqueue.Options) error {
Expand Down Expand Up @@ -140,7 +156,7 @@ func TestServiceCallbackWithDelay(t *testing.T) {
err := callback(context.Background(), "test-key", workqueue.Options{})

// Should return a requeue error
delay, ok := workqueue.GetRequeueDelay(err)
delay, ok, _ := workqueue.GetRequeueDelay(err)
if !ok {
t.Fatalf("Expected requeue error, got: %v", err)
}
Expand Down
37 changes: 28 additions & 9 deletions pkg/workqueue/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,48 @@ func GetNonRetriableDetails(err error) *NoRetryDetails {
// requeueError is a special error type that indicates the work item should be
// requeued with a specific delay.
type requeueError struct {
delay time.Duration
delay time.Duration
isError bool
}

// Error implements the error interface.
func (e *requeueError) Error() string {
return "requeue requested"
if e.isError {
return "requeue requested (error)"
}
return "requeue requested (polling)"
}

// RequeueAfter returns an error that indicates the work item should be requeued
// after the specified delay.
// after the specified delay for normal polling scenarios.
// Use RetryAfter for error/retry scenarios.
func RequeueAfter(delay time.Duration) error {
return &requeueError{delay: delay}
return &requeueError{
delay: delay,
isError: false,
}
}

// RetryAfter returns an error that indicates the work item should be retried
// after the specified delay due to an error condition requiring retry with backoff.
// Use RequeueAfter for normal polling scenarios.
func RetryAfter(delay time.Duration) error {
return &requeueError{
delay: delay,
isError: true,
}
}

// GetRequeueDelay extracts the requeue delay from an error if it's a requeue error.
// Returns the delay and true if the error is a requeue error, or 0 and false otherwise.
func GetRequeueDelay(err error) (time.Duration, bool) {
// Returns the delay, whether it's a requeue error, and whether it's an error scenario (vs polling).
// If the error is not a requeue error, returns (0, false, false).
func GetRequeueDelay(err error) (time.Duration, bool, bool) {
if err == nil {
return 0, false
return 0, false, false
}
var re *requeueError
if errors.As(err, &re) {
return re.delay, true
return re.delay, true, re.isError
}
return 0, false
return 0, false, false
}
138 changes: 108 additions & 30 deletions pkg/workqueue/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,70 +42,148 @@ func TestRequeueAfter(t *testing.T) {
t.Fatal("Expected non-nil error")
}

gotDelay, ok := GetRequeueDelay(err)
gotDelay, ok, isError := GetRequeueDelay(err)
if !ok {
t.Fatal("GetRequeueDelay returned false")
}
if gotDelay != tt.wantDelay {
t.Errorf("Got delay %v, want %v", gotDelay, tt.wantDelay)
}
if isError {
t.Error("Expected isError = false for RequeueAfter, got true")
}
})
}
}

func TestGetRequeueDelay(t *testing.T) {
func TestRetryAfter(t *testing.T) {
tests := []struct {
name string
err error
delay time.Duration
wantDelay time.Duration
wantOk bool
}{
{
name: "requeue error",
err: RequeueAfter(10 * time.Second),
wantDelay: 10 * time.Second,
wantOk: true,
name: "5 second delay",
delay: 5 * time.Second,
wantDelay: 5 * time.Second,
},
{
name: "regular error",
err: errors.New("some error"),
wantDelay: 0,
wantOk: false,
name: "1 minute delay",
delay: time.Minute,
wantDelay: time.Minute,
},
{
name: "nil error",
err: nil,
name: "zero delay",
delay: 0,
wantDelay: 0,
wantOk: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := RetryAfter(tt.delay)
if err == nil {
t.Fatal("Expected non-nil error")
}

gotDelay, ok, isError := GetRequeueDelay(err)
if !ok {
t.Fatal("GetRequeueDelay returned false")
}
if gotDelay != tt.wantDelay {
t.Errorf("Got delay %v, want %v", gotDelay, tt.wantDelay)
}
if !isError {
t.Error("Expected isError = true for RetryAfter, got false")
}
})
}
}

func TestGetRequeueDelay(t *testing.T) {
tests := []struct {
name string
err error
wantDelay time.Duration
wantOk bool
wantIsError bool
}{
{
name: "wrapped requeue error",
err: fmt.Errorf("operation failed: %w", RequeueAfter(15*time.Second)),
wantDelay: 15 * time.Second,
wantOk: true,
name: "requeue error for polling",
err: RequeueAfter(10 * time.Second),
wantDelay: 10 * time.Second,
wantOk: true,
wantIsError: false,
},
{
name: "double wrapped requeue error",
err: fmt.Errorf("outer: %w", fmt.Errorf("inner: %w", RequeueAfter(20*time.Second))),
wantDelay: 20 * time.Second,
wantOk: true,
name: "retry after error",
err: RetryAfter(30 * time.Second),
wantDelay: 30 * time.Second,
wantOk: true,
wantIsError: true,
},
{
name: "wrapped regular error",
err: fmt.Errorf("wrapped: %w", errors.New("some error")),
wantDelay: 0,
wantOk: false,
name: "regular error",
err: errors.New("some error"),
wantDelay: 0,
wantOk: false,
wantIsError: false,
},
{
name: "nil error",
err: nil,
wantDelay: 0,
wantOk: false,
wantIsError: false,
},
{
name: "wrapped requeue error",
err: fmt.Errorf("operation failed: %w", RequeueAfter(15*time.Second)),
wantDelay: 15 * time.Second,
wantOk: true,
wantIsError: false,
},
{
name: "wrapped retry after error",
err: fmt.Errorf("rate limited: %w", RetryAfter(45*time.Second)),
wantDelay: 45 * time.Second,
wantOk: true,
wantIsError: true,
},
{
name: "double wrapped requeue error",
err: fmt.Errorf("outer: %w", fmt.Errorf("inner: %w", RequeueAfter(20*time.Second))),
wantDelay: 20 * time.Second,
wantOk: true,
wantIsError: false,
},
{
name: "double wrapped retry after error",
err: fmt.Errorf("outer: %w", fmt.Errorf("inner: %w", RetryAfter(60*time.Second))),
wantDelay: 60 * time.Second,
wantOk: true,
wantIsError: true,
},
{
name: "wrapped regular error",
err: fmt.Errorf("wrapped: %w", errors.New("some error")),
wantDelay: 0,
wantOk: false,
wantIsError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDelay, gotOk := GetRequeueDelay(tt.err)
gotDelay, gotOk, gotIsError := GetRequeueDelay(tt.err)
if gotOk != tt.wantOk {
t.Errorf("Got ok=%v, want %v", gotOk, tt.wantOk)
t.Errorf("ok: got = %v, wanted = %v", gotOk, tt.wantOk)
}
if gotDelay != tt.wantDelay {
t.Errorf("Got delay %v, want %v", gotDelay, tt.wantDelay)
t.Errorf("delay: got = %v, wanted = %v", gotDelay, tt.wantDelay)
}
if gotIsError != tt.wantIsError {
t.Errorf("isError: got = %v, wanted = %v", gotIsError, tt.wantIsError)
}
})
}
Expand Down
Loading