Skip to content

Commit f1ddc03

Browse files
authored
feat: use informer cache for Get method in Kubernetes backend (#525)
Signed-off-by: yeonsoo <[email protected]> Signed-off-by: Yeonsoo Kim <[email protected]>
1 parent 48e6ed5 commit f1ddc03

File tree

15 files changed

+362
-53
lines changed

15 files changed

+362
-53
lines changed

agent/agent.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
186186
informer.WithDeleteHandler[*v1alpha1.Application](a.addAppDeletionToQueue),
187187
informer.WithFilters[*v1alpha1.Application](a.DefaultAppFilterChain()),
188188
informer.WithNamespaceScope[*v1alpha1.Application](a.namespace),
189+
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
189190
}
190191

191192
appProjectManagerOption := []appproject.AppProjectManagerOption{
@@ -227,6 +228,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
227228
informer.WithAddHandler[*v1alpha1.AppProject](a.addAppProjectCreationToQueue),
228229
informer.WithUpdateHandler[*v1alpha1.AppProject](a.addAppProjectUpdateToQueue),
229230
informer.WithDeleteHandler[*v1alpha1.AppProject](a.addAppProjectDeletionToQueue),
231+
informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"),
230232
}
231233

232234
projInformer, err := informer.NewInformer(ctx, projInformerOptions...)
@@ -263,6 +265,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
263265
informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate),
264266
informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion),
265267
informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)),
268+
informer.WithGroupResource[*corev1.Secret]("", "secrets"),
266269
}
267270

268271
repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...)

internal/argocd/cluster/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func NewManager(ctx context.Context, namespace, redisAddress, redisPassword stri
108108
informer.WithUpdateHandler(m.onClusterUpdated),
109109
informer.WithDeleteHandler(m.onClusterDeleted),
110110
informer.WithFilters(m.filters),
111+
informer.WithGroupResource[*v1.Secret]("", "secrets"),
111112
)
112113
if err != nil {
113114
return nil, err

internal/backend/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import (
2525
corev1 "k8s.io/api/core/v1"
2626
)
2727

28+
type ContextKey string
29+
30+
const ForUpdateContextKey ContextKey = "forUpdate"
31+
2832
type ApplicationSelector struct {
2933

3034
// Labels is not currently implemented.

internal/backend/kubernetes/application/kubernetes.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned"
3131
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/types"
33+
"k8s.io/client-go/tools/cache"
3334
)
3435

3536
var _ backend.Application = &KubernetesBackend{}
@@ -43,18 +44,24 @@ type KubernetesBackend struct {
4344
appClient appclientset.Interface
4445
// appInformer is used to watch for change events for Argo CD Application resources on the cluster
4546
appInformer informer.InformerInterface
47+
// appLister is used to list Argo CD Application resources from the cache
48+
appLister cache.GenericLister
4649
// namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases
4750
namespace string
4851
usePatch bool
4952
}
5053

5154
func NewKubernetesBackend(appClient appclientset.Interface, namespace string, appInformer informer.InformerInterface, usePatch bool) *KubernetesBackend {
52-
return &KubernetesBackend{
55+
be := &KubernetesBackend{
5356
appClient: appClient,
5457
appInformer: appInformer,
5558
usePatch: usePatch,
5659
namespace: namespace,
5760
}
61+
if specificInformer, ok := appInformer.(*informer.Informer[*v1alpha1.Application]); ok {
62+
be.appLister = specificInformer.Lister()
63+
}
64+
return be
5865
}
5966

6067
func (be *KubernetesBackend) List(ctx context.Context, selector backend.ApplicationSelector) ([]v1alpha1.Application, error) {
@@ -82,6 +89,23 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati
8289
}
8390

8491
func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) {
92+
forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool)
93+
94+
if !forUpdate && be.appLister != nil && be.appInformer != nil && be.appInformer.HasSynced() {
95+
namespaceLister := be.appLister.ByNamespace(namespace)
96+
if namespaceLister != nil {
97+
obj, err := namespaceLister.Get(name)
98+
if err != nil {
99+
return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
100+
}
101+
app, ok := obj.(*v1alpha1.Application)
102+
if !ok {
103+
return nil, fmt.Errorf("object is not an Application: %T", obj)
104+
}
105+
return app.DeepCopy(), nil
106+
}
107+
}
108+
85109
return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
86110
}
87111

internal/backend/kubernetes/application/kubernetes_test.go

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@ import (
2121
"testing"
2222

2323
"github.com/argoproj-labs/argocd-agent/internal/backend"
24+
"github.com/argoproj-labs/argocd-agent/internal/informer"
2425
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
2526
fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake"
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829
"github.com/wI2L/jsondiff"
30+
corev1 "k8s.io/api/core/v1"
2931
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32+
"k8s.io/apimachinery/pkg/labels"
3033
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/watch"
35+
"k8s.io/client-go/tools/cache"
3136
)
3237

3338
func Test_NewKubernetes(t *testing.T) {
@@ -110,22 +115,123 @@ func Test_Create(t *testing.T) {
110115

111116
func Test_Get(t *testing.T) {
112117
apps := mkApps()
118+
ctx := context.TODO()
113119
t.Run("Get existing app", func(t *testing.T) {
114120
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
115-
k := NewKubernetesBackend(fakeAppC, "", nil, true)
116-
app, err := k.Get(context.TODO(), "app", "ns1")
121+
122+
inf, err := informer.NewInformer[*v1alpha1.Application](
123+
ctx,
124+
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
125+
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
126+
}),
127+
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
128+
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
129+
}),
130+
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
131+
)
132+
require.NoError(t, err)
133+
134+
go inf.Start(ctx)
135+
require.NoError(t, inf.WaitForSync(ctx))
136+
137+
// Create the backend with the informer
138+
backend := NewKubernetesBackend(fakeAppC, "", inf, true)
139+
140+
app, err := backend.Get(ctx, "app", "ns1")
117141
assert.NoError(t, err)
118142
assert.NotNil(t, app)
143+
assert.Equal(t, "app", app.Name)
144+
assert.Equal(t, "ns1", app.Namespace)
145+
119146
})
120147
t.Run("Get non-existing app", func(t *testing.T) {
121148
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
122-
k := NewKubernetesBackend(fakeAppC, "", nil, true)
123-
app, err := k.Get(context.TODO(), "foo", "ns1")
149+
inf, err := informer.NewInformer[*v1alpha1.Application](
150+
ctx,
151+
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
152+
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
153+
}),
154+
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
155+
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
156+
}),
157+
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
158+
)
159+
require.NoError(t, err)
160+
go inf.Start(ctx)
161+
require.NoError(t, inf.WaitForSync(ctx))
162+
163+
backend := NewKubernetesBackend(fakeAppC, "", inf, true)
164+
165+
app, err := backend.Get(ctx, "nonexistent", "ns1")
124166
assert.ErrorContains(t, err, "not found")
125167
assert.Equal(t, &v1alpha1.Application{}, app)
168+
169+
})
170+
171+
t.Run("Get returns type assertion error for invalid object", func(t *testing.T) {
172+
fakeAppC := fakeappclient.NewSimpleClientset()
173+
174+
mockInf := &mockInformerWithInvalidType{}
175+
176+
backend := &KubernetesBackend{
177+
appClient: fakeAppC,
178+
appInformer: mockInf,
179+
appLister: mockInf.Lister(),
180+
}
181+
182+
app, err := backend.Get(ctx, "test", "ns1")
183+
require.Error(t, err)
184+
require.Nil(t, app)
185+
assert.Contains(t, err.Error(), "object is not an Application")
126186
})
127187
}
128188

189+
type mockInformerWithInvalidType struct{}
190+
191+
func (m *mockInformerWithInvalidType) Start(ctx context.Context) error {
192+
return nil
193+
}
194+
195+
func (m *mockInformerWithInvalidType) WaitForSync(ctx context.Context) error {
196+
return nil
197+
}
198+
199+
func (m *mockInformerWithInvalidType) HasSynced() bool {
200+
return true
201+
}
202+
203+
func (m *mockInformerWithInvalidType) Stop() error {
204+
return nil
205+
}
206+
207+
func (m *mockInformerWithInvalidType) Lister() cache.GenericLister {
208+
return &mockListerWithInvalidType{}
209+
}
210+
211+
type mockListerWithInvalidType struct{}
212+
213+
func (m *mockListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
214+
return nil, nil
215+
}
216+
217+
func (m *mockListerWithInvalidType) Get(name string) (runtime.Object, error) {
218+
return &corev1.ConfigMap{}, nil
219+
}
220+
221+
func (m *mockListerWithInvalidType) ByNamespace(namespace string) cache.GenericNamespaceLister {
222+
return &mockNamespaceListerWithInvalidType{}
223+
}
224+
225+
type mockNamespaceListerWithInvalidType struct{}
226+
227+
func (m *mockNamespaceListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
228+
return nil, nil
229+
}
230+
231+
func (m *mockNamespaceListerWithInvalidType) Get(name string) (runtime.Object, error) {
232+
return &corev1.ConfigMap{}, nil
233+
}
234+
129235
func Test_Delete(t *testing.T) {
130236
apps := mkApps()
131237
t.Run("Delete existing app", func(t *testing.T) {

internal/informer/informer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/sirupsen/logrus"
2929
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/runtime"
31+
"k8s.io/apimachinery/pkg/runtime/schema"
3132
"k8s.io/apimachinery/pkg/watch"
3233
"k8s.io/client-go/tools/cache"
3334
)
@@ -57,6 +58,9 @@ type Informer[T runtime.Object] struct {
5758

5859
resType reflect.Type
5960

61+
// groupResource is the group and resource of the watched objects.
62+
groupResource schema.GroupResource
63+
6064
// logger is this informer's logger.
6165
logger *logrus.Entry
6266

@@ -109,6 +113,7 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T
109113
i := &Informer[T]{}
110114
var r T
111115
i.resType = reflect.TypeOf(r)
116+
112117
i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{
113118
"type": i.resType,
114119
"module": "Informer",
@@ -287,3 +292,8 @@ func (i *Informer[T]) WaitForSync(ctx context.Context) error {
287292
}
288293
return nil
289294
}
295+
296+
// Lister returns a GenericLister that can be used to list and get cached resources.
297+
func (i *Informer[T]) Lister() cache.GenericLister {
298+
return cache.NewGenericLister(i.informer.GetIndexer(), i.groupResource)
299+
}

internal/informer/informer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/stretchr/testify/assert"
2828
"github.com/stretchr/testify/require"
2929
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/labels"
3031
"k8s.io/apimachinery/pkg/runtime"
3132
"k8s.io/apimachinery/pkg/watch"
3233
)
@@ -233,6 +234,39 @@ func Test_InformerScope(t *testing.T) {
233234

234235
}
235236

237+
func Test_Lister(t *testing.T) {
238+
t.Run("Lister returns GenericLister", func(t *testing.T) {
239+
i := newInformer(t, "", apps[0], apps[1])
240+
go i.Start(context.TODO())
241+
require.NoError(t, i.WaitForSync(context.TODO()))
242+
defer i.Stop()
243+
244+
lister := i.Lister()
245+
require.NotNil(t, lister)
246+
247+
obj, err := lister.ByNamespace("argocd").Get("test1")
248+
require.NoError(t, err)
249+
require.NotNil(t, obj)
250+
251+
app, ok := obj.(*v1alpha1.Application)
252+
require.True(t, ok)
253+
assert.Equal(t, "test1", app.Name)
254+
assert.Equal(t, "argocd", app.Namespace)
255+
})
256+
257+
t.Run("Lister can list objects", func(t *testing.T) {
258+
i := newInformer(t, "", apps[0], apps[1])
259+
go i.Start(context.TODO())
260+
require.NoError(t, i.WaitForSync(context.TODO()))
261+
defer i.Stop()
262+
263+
lister := i.Lister()
264+
objs, err := lister.List(labels.Everything())
265+
require.NoError(t, err)
266+
assert.Len(t, objs, 2)
267+
})
268+
}
269+
236270
func init() {
237271
logrus.SetLevel(logrus.TraceLevel)
238272
}

internal/informer/options.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/prometheus/client_golang/prometheus"
2424
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
2627
"k8s.io/apimachinery/pkg/watch"
2728
)
2829

@@ -105,3 +106,14 @@ func WithResyncPeriod[T runtime.Object](d time.Duration) InformerOption[T] {
105106
return nil
106107
}
107108
}
109+
110+
// WithGroupResource sets the group and resource for the informer's lister.
111+
func WithGroupResource[T runtime.Object](group, resource string) InformerOption[T] {
112+
return func(i *Informer[T]) error {
113+
i.groupResource = schema.GroupResource{
114+
Group: group,
115+
Resource: resource,
116+
}
117+
return nil
118+
}
119+
}

internal/manager/application/application.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,14 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom
555555
// be returned.
556556
func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) {
557557
var updated *v1alpha1.Application
558+
559+
if ctx == nil {
560+
ctx = context.Background()
561+
}
562+
ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true)
563+
558564
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
559-
existing, ierr := m.applicationBackend.Get(ctx, incoming.Name, incoming.Namespace)
565+
existing, ierr := m.applicationBackend.Get(ctxForUpdate, incoming.Name, incoming.Namespace)
560566
if ierr != nil {
561567
if errors.IsNotFound(ierr) && upsert {
562568
updated, ierr = m.Create(ctx, incoming)

internal/manager/application/application_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,21 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f
5656
return appC.ArgoprojV1alpha1().Applications(namespace).Watch(ctx, opts)
5757
}),
5858
informer.WithNamespaceScope[*v1alpha1.Application](namespace),
59+
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
5960
)
6061
require.NoError(t, err)
62+
63+
go func() {
64+
err = informer.Start(context.Background())
65+
if err != nil {
66+
t.Fatalf("failed to start informer: %v", err)
67+
}
68+
}()
69+
70+
if err = informer.WaitForSync(context.Background()); err != nil {
71+
t.Fatalf("failed to wait for informer sync: %v", err)
72+
}
73+
6174
return appC, informer
6275
}
6376

@@ -210,6 +223,7 @@ func Test_ManagerUpdateManaged(t *testing.T) {
210223
require.NoError(t, err)
211224

212225
updated, err := mgr.UpdateManagedApp(context.Background(), incoming)
226+
213227
require.NoError(t, err)
214228
require.NotNil(t, updated)
215229

0 commit comments

Comments
 (0)