Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions core/clustersmngr/factory_caches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clustersmngr_test

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -82,12 +83,25 @@ func TestClustersNamespaces(t *testing.T) {
ns := v1.Namespace{}
ns.Name = "ns1"

// Use WaitGroup to ensure all goroutines complete
var wg sync.WaitGroup
wg.Add(2)

// simulating concurrent access
go cs.Set(clusterName, []v1.Namespace{ns})
go cs.Set(clusterName, []v1.Namespace{ns})
go func() {
defer wg.Done()
cs.Set(clusterName, []v1.Namespace{ns})
}()
go func() {
defer wg.Done()
cs.Set(clusterName, []v1.Namespace{ns})
}()

cs.Set(clusterName, []v1.Namespace{ns})

// Wait for all goroutines to complete
wg.Wait()

g.Expect(cs.Get(clusterName)).To(Equal([]v1.Namespace{ns}))

cs.Clear()
Expand Down
25 changes: 17 additions & 8 deletions core/server/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server_test
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -39,6 +40,9 @@ func TestSync(t *testing.T) {
})
g.Expect(err).NotTo(HaveOccurred())

// Mutex to prevent concurrent updates to the same resource
var reconcileMutex sync.Mutex

name := "myapp"
ns := newNamespace(ctx, k, g)

Expand Down Expand Up @@ -207,22 +211,24 @@ func TestSync(t *testing.T) {
msg.Namespace = tt.reconcilable.GetNamespace()
}

done := make(chan error)
defer close(done)
done := make(chan error, 1) // Buffered channel to prevent blocking

go func() {
md := metadata.Pairs(MetadataUserKey, "anne", MetadataGroupsKey, "system:masters")
outgoingCtx := metadata.NewOutgoingContext(ctx, md)
_, err := c.SyncFluxObject(outgoingCtx, msg)
select {
case <-done:
case done <- err:
// Successfully sent error
case <-ctx.Done():
// test cancelled; avoid blocking and exit goroutine
return
default:
done <- err
}
}()

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -231,13 +237,13 @@ func TestSync(t *testing.T) {
Name: tt.source.GetName(),
Namespace: tt.source.GetNamespace(),
}
if err := simulateReconcile(ctx, k, sn, tt.source.AsClientObject()); err != nil {
if err := simulateReconcile(ctx, k, sn, tt.source.AsClientObject(), &reconcileMutex); err != nil {
t.Fatal(err)
}
}

an := types.NamespacedName{Name: name, Namespace: ns.Name}
if err := simulateReconcile(ctx, k, an, tt.reconcilable.AsClientObject()); err != nil {
if err := simulateReconcile(ctx, k, an, tt.reconcilable.AsClientObject(), &reconcileMutex); err != nil {
t.Fatal(err)
}

Expand All @@ -252,7 +258,10 @@ func TestSync(t *testing.T) {
}
}

func simulateReconcile(ctx context.Context, k client.Client, name types.NamespacedName, o client.Object) error {
func simulateReconcile(ctx context.Context, k client.Client, name types.NamespacedName, o client.Object, mutex *sync.Mutex) error {
mutex.Lock()
defer mutex.Unlock()

switch obj := o.(type) {
case *helmv2.HelmRelease:
if err := k.Get(ctx, name, obj); err != nil {
Expand Down