Skip to content
Merged
13 changes: 13 additions & 0 deletions controller/lifecycle/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/platform-mesh/golang-commons/controller/lifecycle/controllerruntime"
"github.com/platform-mesh/golang-commons/controller/lifecycle/multicluster"
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
"github.com/platform-mesh/golang-commons/logger"
)
Expand All @@ -16,6 +17,7 @@ type Builder struct {
withConditionManagement bool
withSpreadingReconciles bool
withReadOnly bool
rateLimiterOptions *[]ratelimiter.Option
subroutines []subroutine.Subroutine
log *logger.Logger
}
Expand Down Expand Up @@ -45,6 +47,11 @@ func (b *Builder) WithReadOnly() *Builder {
return b
}

func (b *Builder) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *Builder {
b.rateLimiterOptions = &opts
return b
}

func (b *Builder) BuildControllerRuntime(cl client.Client) *controllerruntime.LifecycleManager {
lm := controllerruntime.NewLifecycleManager(b.subroutines, b.operatorName, b.controllerName, cl, b.log)
if b.withConditionManagement {
Expand All @@ -56,6 +63,9 @@ func (b *Builder) BuildControllerRuntime(cl client.Client) *controllerruntime.Li
if b.withReadOnly {
lm.WithReadOnly()
}
if b.rateLimiterOptions != nil {
lm.WithStaticThenExponentialRateLimiter((*b.rateLimiterOptions)...)
}
return lm
}

Expand All @@ -70,5 +80,8 @@ func (b *Builder) BuildMultiCluster(mgr mcmanager.Manager) *multicluster.Lifecyc
if b.withReadOnly {
lm.WithReadOnly()
}
if b.rateLimiterOptions != nil {
lm.WithStaticThenExponentialRateLimiter((*b.rateLimiterOptions)...)
}
return lm
}
57 changes: 57 additions & 0 deletions controller/lifecycle/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package builder

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/client-go/rest"
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"

"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
pmtesting "github.com/platform-mesh/golang-commons/controller/testSupport"
"github.com/platform-mesh/golang-commons/logger"
)
Expand Down Expand Up @@ -58,6 +60,38 @@ func TestBuilder_WithReadOnly(t *testing.T) {
}
}

func TestBuilder_WithCustomRateLimiter(t *testing.T) {
t.Run("With options", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
opts := []ratelimiter.Option{
ratelimiter.WithRequeueDelay(5 * time.Second),
ratelimiter.WithStaticWindow(1 * time.Minute),
}
b.WithStaticThenExponentialRateLimiter(opts...)
if b.rateLimiterOptions == nil {
t.Error("expected rateLimiterOptions to be non-nil")
}
if got := len(*b.rateLimiterOptions); got != 2 {
t.Errorf("expected 2 rate limiter options, got %d", got)
}
})
t.Run("Without options", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
b.WithStaticThenExponentialRateLimiter()
if b.rateLimiterOptions == nil {
t.Error("expected rateLimiterOptions to be non-nil even with no options")
}
if got := len(*b.rateLimiterOptions); got != 0 {
t.Errorf("expected 0 rate limiter options, got %d", got)
}
})

t.Run("Without custom rate limiter", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
assert.Nil(t, b.rateLimiterOptions)
})
}

func TestControllerRuntimeBuilder(t *testing.T) {
t.Run("Minimal setup", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
Expand All @@ -77,6 +111,16 @@ func TestControllerRuntimeBuilder(t *testing.T) {
lm := b.BuildControllerRuntime(fakeClient)
assert.NotNil(t, lm)
})
t.Run("WithCustomRateLimiter", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{}).WithStaticThenExponentialRateLimiter(
ratelimiter.WithRequeueDelay(5*time.Second),
ratelimiter.WithStaticWindow(1*time.Minute),
ratelimiter.WithExponentialInitialBackoff(5*time.Second),
)
fakeClient := pmtesting.CreateFakeClient(t, &pmtesting.TestApiObject{})
lm := b.BuildControllerRuntime(fakeClient)
assert.NotNil(t, lm)
})
}

func TestMulticontrollerRuntimeBuilder(t *testing.T) {
Expand Down Expand Up @@ -107,4 +151,17 @@ func TestMulticontrollerRuntimeBuilder(t *testing.T) {
lm := b.BuildMultiCluster(mgr)
assert.NotNil(t, lm)
})
t.Run("WithCustomRateLimiter", func(t *testing.T) {
b := NewBuilder("op", "ctrl", nil, &logger.Logger{}).WithStaticThenExponentialRateLimiter(
ratelimiter.WithRequeueDelay(5*time.Second),
ratelimiter.WithStaticWindow(1*time.Minute),
ratelimiter.WithExponentialInitialBackoff(5*time.Second),
)
cfg := &rest.Config{}
provider := pmtesting.NewFakeProvider(cfg)
mgr, err := mcmanager.New(cfg, provider, mcmanager.Options{})
assert.NoError(t, err)
lm := b.BuildMultiCluster(mgr)
assert.NotNil(t, lm)
})
}
24 changes: 23 additions & 1 deletion controller/lifecycle/controllerruntime/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllerruntime
import (
"context"
"fmt"
"log"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -11,10 +12,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"k8s.io/client-go/util/workqueue"

"github.com/platform-mesh/golang-commons/controller/filter"
"github.com/platform-mesh/golang-commons/controller/lifecycle"
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
Expand All @@ -29,6 +33,7 @@ type LifecycleManager struct {
spreader *spread.Spreader
conditionsManager *conditions.ConditionManager
prepareContextFunc api.PrepareContextFunc
rateLimiter workqueue.TypedRateLimiter[reconcile.Request]
}

func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, client client.Client, log *logger.Logger) *LifecycleManager {
Expand Down Expand Up @@ -83,10 +88,18 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr ctrl.Manager, maxReconcil
}

eventPredicates = append([]predicate.Predicate{filter.DebugResourcesBehaviourPredicate(debugLabelValue)}, eventPredicates...)
opts := controller.Options{
MaxConcurrentReconciles: maxReconciles,
}

if l.rateLimiter != nil {
opts.RateLimiter = l.rateLimiter
}

return ctrl.NewControllerManagedBy(mgr).
Named(reconcilerName).
For(instance).
WithOptions(controller.Options{MaxConcurrentReconciles: maxReconciles}).
WithOptions(opts).
WithEventFilter(predicate.And(eventPredicates...)), nil
}
func (l *LifecycleManager) SetupWithManager(mgr ctrl.Manager, maxReconciles int, reconcilerName string, instance runtimeobject.RuntimeObject, debugLabelValue string, r reconcile.Reconciler, log *logger.Logger, eventPredicates ...predicate.Predicate) error {
Expand Down Expand Up @@ -123,3 +136,12 @@ func (l *LifecycleManager) WithConditionManagement() *LifecycleManager {
l.conditionsManager = conditions.NewConditionManager()
return l
}

func (l *LifecycleManager) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
rateLimiter, err := ratelimiter.NewStaticThenExponentialRateLimiter[reconcile.Request](ratelimiter.NewConfig(opts...))
if err != nil {
log.Fatalf("rate limiter config error: %s",err)
}
l.rateLimiter = rateLimiter
return l
}
27 changes: 27 additions & 0 deletions controller/lifecycle/controllerruntime/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
goerrors "errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -13,6 +14,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
pmtesting "github.com/platform-mesh/golang-commons/controller/testSupport"
Expand Down Expand Up @@ -152,6 +154,31 @@ func TestLifecycle(t *testing.T) {
assert.True(t, true, l.ConditionsManager() != nil)
})

t.Run("WithRateLimiter", func(t *testing.T) {
fakeClient := pmtesting.CreateFakeClient(t, &pmtesting.TestApiObject{})
_, log := createLifecycleManager([]subroutine.Subroutine{}, fakeClient)

l := NewLifecycleManager([]subroutine.Subroutine{}, "test-operator", "test-controller", fakeClient, log.Logger)
expectedCfg := ratelimiter.Config{
StaticRequeueDelay: 5 * time.Second,
StaticWindow: 10 * time.Second,
ExponentialInitialBackoff: 5 * time.Second,
ExponentialMaxBackoff: time.Minute,
}
l.WithStaticThenExponentialRateLimiter(
ratelimiter.WithRequeueDelay(expectedCfg.StaticRequeueDelay),
ratelimiter.WithStaticWindow(expectedCfg.StaticWindow),
ratelimiter.WithExponentialInitialBackoff(expectedCfg.ExponentialInitialBackoff),
ratelimiter.WithExponentialMaxBackoff(expectedCfg.ExponentialMaxBackoff),
)

assert.NotNil(t, l.rateLimiter, "rate limiter should be configured")

req := controllerruntime.Request{}
delay := l.rateLimiter.When(req)
assert.Equal(t, expectedCfg.StaticRequeueDelay, delay)
})

}

type testReconciler struct {
Expand Down
19 changes: 19 additions & 0 deletions controller/lifecycle/multicluster/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package multicluster
import (
"context"
"fmt"
"log"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand All @@ -13,10 +14,13 @@ import (
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"

"k8s.io/client-go/util/workqueue"

"github.com/platform-mesh/golang-commons/controller/filter"
"github.com/platform-mesh/golang-commons/controller/lifecycle"
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
Expand All @@ -35,6 +39,7 @@ type LifecycleManager struct {
spreader *spread.Spreader
conditionsManager *conditions.ConditionManager
prepareContextFunc api.PrepareContextFunc
rateLimiter workqueue.TypedRateLimiter[mcreconcile.Request]
}

func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, mgr ClusterGetter, log *logger.Logger) *LifecycleManager {
Expand Down Expand Up @@ -96,6 +101,11 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr mcmanager.Manager, maxRec
opts := controller.TypedOptions[mcreconcile.Request]{
MaxConcurrentReconciles: maxReconciles,
}

if l.rateLimiter != nil {
opts.RateLimiter = l.rateLimiter
}

return mcbuilder.ControllerManagedBy(mgr).
Named(reconcilerName).
For(instance).
Expand Down Expand Up @@ -136,3 +146,12 @@ func (l *LifecycleManager) WithConditionManagement() api.Lifecycle {
l.conditionsManager = conditions.NewConditionManager()
return l
}

func (l *LifecycleManager) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
rateLimiter, err := ratelimiter.NewStaticThenExponentialRateLimiter[mcreconcile.Request](ratelimiter.NewConfig(opts...))
if err != nil {
log.Fatalf("rate limiter config error: %s",err)
}
l.rateLimiter = rateLimiter
return l
}
72 changes: 72 additions & 0 deletions controller/lifecycle/ratelimiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ratelimiter

import (
"fmt"
"time"
)

type Config struct {
StaticRequeueDelay time.Duration
StaticWindow time.Duration
ExponentialInitialBackoff time.Duration
ExponentialMaxBackoff time.Duration
}

var defaultConfig = Config{
StaticRequeueDelay: 2 * time.Second,
StaticWindow: 60 * time.Second,
ExponentialInitialBackoff: 2 * time.Second,
ExponentialMaxBackoff: 1000 * time.Second,
}

func (c Config) validate() error {
if c.StaticRequeueDelay < 0 {
return fmt.Errorf("the static requeue delay shouldn't be negative")
}
if c.ExponentialInitialBackoff < 0 {
return fmt.Errorf("the initial exponential backoff shouldn't be negative")
}
if c.StaticRequeueDelay > c.ExponentialInitialBackoff {
return fmt.Errorf("the initial exponential backoff should be equal to or greater than the static requeue delay")
}
if c.StaticWindow < c.StaticRequeueDelay {
return fmt.Errorf("the static window duration should be equal to or greater than the static requeue delay")
}
return nil
}

type Option func(*Config)

func WithStaticWindow(d time.Duration) Option {
return func(c *Config) {
c.StaticWindow = d
}
}

func WithRequeueDelay(d time.Duration) Option {
return func(c *Config) {
c.StaticRequeueDelay = d
}
}

func WithExponentialInitialBackoff(d time.Duration) Option {
return func(c *Config) {
c.ExponentialInitialBackoff = d
}
}

func WithExponentialMaxBackoff(d time.Duration) Option {
return func(c *Config) {
c.ExponentialMaxBackoff = d
}
}

func NewConfig(options ...Option) Config {
cfg := defaultConfig

for _, option := range options {
option(&cfg)
}

return cfg
}
Loading