Skip to content

Commit 7ccebe3

Browse files
committed
feat: introduced custom rate limiter based on options pattern
On-behalf-of: SAP [email protected]
1 parent ed6ebaa commit 7ccebe3

File tree

6 files changed

+200
-1
lines changed

6 files changed

+200
-1
lines changed

controller/lifecycle/controllerruntime/lifecycle.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/platform-mesh/golang-commons/controller/lifecycle"
1616
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
1717
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
18+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
1819
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
1920
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
2021
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
@@ -29,6 +30,7 @@ type LifecycleManager struct {
2930
spreader *spread.Spreader
3031
conditionsManager *conditions.ConditionManager
3132
prepareContextFunc api.PrepareContextFunc
33+
rateLimiterConfig ratelimiter.Config
3234
}
3335

3436
func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, client client.Client, log *logger.Logger) *LifecycleManager {
@@ -83,10 +85,15 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr ctrl.Manager, maxReconcil
8385
}
8486

8587
eventPredicates = append([]predicate.Predicate{filter.DebugResourcesBehaviourPredicate(debugLabelValue)}, eventPredicates...)
88+
opts := controller.Options{
89+
MaxConcurrentReconciles: maxReconciles,
90+
RateLimiter: ratelimiter.NewStaticThenExponentialRateLimiter[reconcile.Request](l.rateLimiterConfig),
91+
}
92+
8693
return ctrl.NewControllerManagedBy(mgr).
8794
Named(reconcilerName).
8895
For(instance).
89-
WithOptions(controller.Options{MaxConcurrentReconciles: maxReconciles}).
96+
WithOptions(opts).
9097
WithEventFilter(predicate.And(eventPredicates...)), nil
9198
}
9299
func (l *LifecycleManager) SetupWithManager(mgr ctrl.Manager, maxReconciles int, reconcilerName string, instance runtimeobject.RuntimeObject, debugLabelValue string, r reconcile.Reconciler, log *logger.Logger, eventPredicates ...predicate.Predicate) error {
@@ -123,3 +130,9 @@ func (l *LifecycleManager) WithConditionManagement() *LifecycleManager {
123130
l.conditionsManager = conditions.NewConditionManager()
124131
return l
125132
}
133+
134+
func (l *LifecycleManager) WithRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
135+
cfg := ratelimiter.NewConfig(opts...)
136+
l.rateLimiterConfig = cfg
137+
return l
138+
}

controller/lifecycle/controllerruntime/lifecycle_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
goerrors "errors"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -13,6 +14,7 @@ import (
1314
"sigs.k8s.io/controller-runtime/pkg/client"
1415
"sigs.k8s.io/controller-runtime/pkg/manager"
1516

17+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
1618
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
1719
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
1820
pmtesting "github.com/platform-mesh/golang-commons/controller/testSupport"
@@ -152,6 +154,27 @@ func TestLifecycle(t *testing.T) {
152154
assert.True(t, true, l.ConditionsManager() != nil)
153155
})
154156

157+
t.Run("WithRateLimiter", func(t *testing.T) {
158+
fakeClient := pmtesting.CreateFakeClient(t, &pmtesting.TestApiObject{})
159+
_, log := createLifecycleManager([]subroutine.Subroutine{}, fakeClient)
160+
161+
l := NewLifecycleManager([]subroutine.Subroutine{}, "test-operator", "test-controller", fakeClient, log.Logger)
162+
expectedCfg := ratelimiter.Config{
163+
StaticRequeueDelay: 5 * time.Second,
164+
StaticWindow: 10 * time.Second,
165+
ExponentialInitialBackoff: 5 * time.Second,
166+
ExponentialMaxBackoff: time.Minute,
167+
}
168+
l.WithRateLimiter(
169+
ratelimiter.WithRequeueDelay(expectedCfg.StaticRequeueDelay),
170+
ratelimiter.WithStaticWindow(expectedCfg.StaticWindow),
171+
ratelimiter.WithExponentialInitialBackoff(expectedCfg.ExponentialInitialBackoff),
172+
ratelimiter.WithExponentialMaxBackoff(expectedCfg.ExponentialMaxBackoff),
173+
)
174+
175+
assert.Equal(t, expectedCfg, l.rateLimiterConfig)
176+
})
177+
155178
}
156179

157180
type testReconciler struct {

controller/lifecycle/multicluster/lifecycle.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/platform-mesh/golang-commons/controller/lifecycle"
1818
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
1919
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
20+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
2021
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
2122
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
2223
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
@@ -35,6 +36,7 @@ type LifecycleManager struct {
3536
spreader *spread.Spreader
3637
conditionsManager *conditions.ConditionManager
3738
prepareContextFunc api.PrepareContextFunc
39+
rateLimiterConfig ratelimiter.Config
3840
}
3941

4042
func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, mgr ClusterGetter, log *logger.Logger) *LifecycleManager {
@@ -95,7 +97,9 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr mcmanager.Manager, maxRec
9597
eventPredicates = append([]predicate.Predicate{filter.DebugResourcesBehaviourPredicate(debugLabelValue)}, eventPredicates...)
9698
opts := controller.TypedOptions[mcreconcile.Request]{
9799
MaxConcurrentReconciles: maxReconciles,
100+
RateLimiter: ratelimiter.NewStaticThenExponentialRateLimiter[mcreconcile.Request](l.rateLimiterConfig),
98101
}
102+
99103
return mcbuilder.ControllerManagedBy(mgr).
100104
Named(reconcilerName).
101105
For(instance).
@@ -136,3 +140,9 @@ func (l *LifecycleManager) WithConditionManagement() api.Lifecycle {
136140
l.conditionsManager = conditions.NewConditionManager()
137141
return l
138142
}
143+
144+
func (l *LifecycleManager) WithRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
145+
cfg := ratelimiter.NewConfig(opts...)
146+
l.rateLimiterConfig = cfg
147+
return l
148+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package ratelimiter
2+
3+
import (
4+
"time"
5+
)
6+
7+
type Config struct {
8+
StaticRequeueDelay time.Duration
9+
StaticWindow time.Duration
10+
ExponentialInitialBackoff time.Duration
11+
ExponentialMaxBackoff time.Duration
12+
}
13+
14+
var defaultConfig = Config{
15+
StaticRequeueDelay: 5 * time.Second,
16+
StaticWindow: 1 * time.Minute,
17+
ExponentialInitialBackoff: 5 * time.Second,
18+
ExponentialMaxBackoff: 2 * time.Minute,
19+
}
20+
21+
type Option func(*Config)
22+
23+
func WithStaticWindow(d time.Duration) Option {
24+
return func(c *Config) {
25+
c.StaticWindow = d
26+
}
27+
}
28+
29+
func WithRequeueDelay(d time.Duration) Option {
30+
return func(c *Config) {
31+
c.StaticRequeueDelay = d
32+
}
33+
}
34+
35+
func WithExponentialInitialBackoff(d time.Duration) Option {
36+
return func(c *Config) {
37+
c.ExponentialInitialBackoff = d
38+
}
39+
}
40+
41+
func WithExponentialMaxBackoff(d time.Duration) Option {
42+
return func(c *Config) {
43+
c.ExponentialMaxBackoff = d
44+
}
45+
}
46+
47+
func NewConfig(options ...Option) Config {
48+
cfg := defaultConfig
49+
50+
for _, option := range options {
51+
option(&cfg)
52+
}
53+
54+
return cfg
55+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package ratelimiter
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"k8s.io/client-go/util/workqueue"
8+
"k8s.io/utils/clock"
9+
)
10+
11+
type StaticThenExponentialRateLimiter[T comparable] struct {
12+
failuresLock sync.Mutex
13+
firstAttempt map[T]time.Time
14+
15+
staticDelay time.Duration
16+
staticWindow time.Duration
17+
18+
exponential workqueue.TypedRateLimiter[T]
19+
clock clock.Clock
20+
}
21+
22+
func NewStaticThenExponentialRateLimiter[T comparable](cfg Config) *StaticThenExponentialRateLimiter[T] {
23+
return &StaticThenExponentialRateLimiter[T]{
24+
staticDelay: cfg.StaticRequeueDelay,
25+
staticWindow: cfg.StaticWindow,
26+
exponential: workqueue.NewTypedItemExponentialFailureRateLimiter[T](
27+
cfg.ExponentialInitialBackoff,
28+
cfg.ExponentialMaxBackoff,
29+
),
30+
firstAttempt: make(map[T]time.Time),
31+
clock: clock.RealClock{},
32+
}
33+
}
34+
35+
func (r *StaticThenExponentialRateLimiter[T]) When(item T) time.Duration {
36+
r.failuresLock.Lock()
37+
defer r.failuresLock.Unlock()
38+
39+
now := r.clock.Now()
40+
41+
first, exists := r.firstAttempt[item]
42+
if !exists {
43+
first = now
44+
r.firstAttempt[item] = first
45+
}
46+
47+
timeSinceFirst := now.Sub(first)
48+
if timeSinceFirst <= r.staticWindow {
49+
return r.staticDelay
50+
}
51+
52+
return r.exponential.When(item)
53+
}
54+
55+
func (r *StaticThenExponentialRateLimiter[T]) Forget(item T) {
56+
r.failuresLock.Lock()
57+
defer r.failuresLock.Unlock()
58+
59+
delete(r.firstAttempt, item)
60+
r.exponential.Forget(item)
61+
}
62+
63+
func (r *StaticThenExponentialRateLimiter[T]) NumRequeues(item T) int {
64+
return r.exponential.NumRequeues(item)
65+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package ratelimiter
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
"k8s.io/apimachinery/pkg/types"
9+
clocktesting "k8s.io/utils/clock/testing"
10+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
11+
)
12+
13+
func TestStaticThenExponentialRateLimiter_Forget(t *testing.T) {
14+
cfg := Config{
15+
StaticRequeueDelay: 1 * time.Second,
16+
StaticWindow: 5 * time.Second,
17+
ExponentialInitialBackoff: 2 * time.Second,
18+
ExponentialMaxBackoff: 1 * time.Minute,
19+
}
20+
limiter := NewStaticThenExponentialRateLimiter[reconcile.Request](cfg)
21+
fakeClock := clocktesting.NewFakeClock(time.Now())
22+
limiter.clock = fakeClock
23+
24+
item := reconcile.Request{NamespacedName: types.NamespacedName{Name: "name", Namespace: "namespace"}}
25+
require.Equal(t, cfg.StaticRequeueDelay, limiter.When(item))
26+
fakeClock.Step(10 * time.Second)
27+
_ = limiter.When(item)
28+
29+
limiter.Forget(item)
30+
require.Equal(t, 0, limiter.NumRequeues(item))
31+
delay := limiter.When(item)
32+
require.Equal(t, cfg.StaticRequeueDelay, delay)
33+
}

0 commit comments

Comments
 (0)