Skip to content

Commit 15fabf6

Browse files
committed
test: improve coverage
1 parent ff161e4 commit 15fabf6

File tree

3 files changed

+578
-0
lines changed

3 files changed

+578
-0
lines changed

pkg/activator/handler/handler_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"knative.dev/serving/pkg/activator"
4040
activatorconfig "knative.dev/serving/pkg/activator/config"
4141
activatortest "knative.dev/serving/pkg/activator/testing"
42+
apiconfig "knative.dev/serving/pkg/apis/config"
4243
"knative.dev/serving/pkg/apis/serving"
4344
v1 "knative.dev/serving/pkg/apis/serving/v1"
4445
"knative.dev/serving/pkg/queue"
@@ -47,6 +48,7 @@ import (
4748
"k8s.io/apimachinery/pkg/types"
4849

4950
"knative.dev/pkg/logging"
51+
ktesting "knative.dev/pkg/logging/testing"
5052
)
5153

5254
const (
@@ -495,3 +497,72 @@ func (rr *responseRecorder) Write(p []byte) (int, error) {
495497
func (rr *responseRecorder) WriteHeader(code int) {
496498
rr.code = code
497499
}
500+
501+
func TestWrapActivatorHandlerWithFullDuplex(t *testing.T) {
502+
logger := ktesting.TestLogger(t)
503+
504+
tests := []struct {
505+
name string
506+
annotation string
507+
expectFullDuplex bool
508+
}{
509+
{
510+
name: "full duplex enabled",
511+
annotation: "Enabled",
512+
expectFullDuplex: true,
513+
},
514+
{
515+
name: "full duplex disabled",
516+
annotation: "Disabled",
517+
expectFullDuplex: false,
518+
},
519+
{
520+
name: "full duplex missing annotation",
521+
annotation: "",
522+
expectFullDuplex: false,
523+
},
524+
{
525+
name: "full duplex case insensitive",
526+
annotation: "enabled",
527+
expectFullDuplex: true,
528+
},
529+
}
530+
531+
for _, tt := range tests {
532+
t.Run(tt.name, func(t *testing.T) {
533+
// Create a test handler
534+
testHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
535+
// Just respond with OK
536+
w.WriteHeader(http.StatusOK)
537+
})
538+
539+
// Wrap with full duplex handler
540+
wrapped := WrapActivatorHandlerWithFullDuplex(testHandler, logger)
541+
542+
// Create request with context
543+
req := httptest.NewRequest(http.MethodGet, "/", nil)
544+
545+
// Set up revision context with annotation
546+
rev := &v1.Revision{
547+
ObjectMeta: metav1.ObjectMeta{
548+
Annotations: map[string]string{},
549+
},
550+
}
551+
if tt.annotation != "" {
552+
rev.Annotations[apiconfig.AllowHTTPFullDuplexFeatureKey] = tt.annotation
553+
}
554+
555+
ctx := WithRevisionAndID(context.Background(), rev, types.NamespacedName{})
556+
req = req.WithContext(ctx)
557+
558+
// Execute request
559+
resp := httptest.NewRecorder()
560+
wrapped.ServeHTTP(resp, req)
561+
562+
// Verify response code
563+
if resp.Code != http.StatusOK {
564+
t.Errorf("Expected status %d, got %d", http.StatusOK, resp.Code)
565+
}
566+
})
567+
}
568+
}

pkg/activator/net/throttler_test.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,3 +1230,297 @@ func TestResetTrackersRaceCondition(t *testing.T) {
12301230
rt.resetTrackers()
12311231
})
12321232
}
1233+
1234+
func TestPodTrackerStateTransitions(t *testing.T) {
1235+
t.Run("initial state is healthy", func(t *testing.T) {
1236+
tracker := newPodTracker("10.0.0.1:8012", nil)
1237+
1238+
state := podState(tracker.state.Load())
1239+
if state != podHealthy {
1240+
t.Errorf("Expected initial state to be podHealthy, got %v", state)
1241+
}
1242+
})
1243+
1244+
t.Run("tryDrain transitions from healthy to draining", func(t *testing.T) {
1245+
tracker := newPodTracker("10.0.0.1:8012", nil)
1246+
1247+
// Should successfully transition to draining
1248+
if !tracker.tryDrain() {
1249+
t.Error("Expected tryDrain to succeed on healthy pod")
1250+
}
1251+
1252+
state := podState(tracker.state.Load())
1253+
if state != podDraining {
1254+
t.Errorf("Expected state to be podDraining after tryDrain, got %v", state)
1255+
}
1256+
1257+
// Should not transition again
1258+
if tracker.tryDrain() {
1259+
t.Error("Expected tryDrain to fail on already draining pod")
1260+
}
1261+
1262+
// Verify draining start time was set
1263+
if tracker.drainingStartTime.Load() == 0 {
1264+
t.Error("Expected drainingStartTime to be set")
1265+
}
1266+
})
1267+
1268+
t.Run("pending state allows reservation", func(t *testing.T) {
1269+
tracker := newPodTracker("10.0.0.1:8012", nil)
1270+
tracker.state.Store(uint32(podPending))
1271+
1272+
// Should be able to reserve on pending pod
1273+
release, ok := tracker.Reserve(context.Background())
1274+
if !ok {
1275+
t.Error("Expected Reserve to succeed on pending pod")
1276+
}
1277+
if release != nil {
1278+
release()
1279+
}
1280+
})
1281+
1282+
t.Run("draining state blocks new reservations", func(t *testing.T) {
1283+
tracker := newPodTracker("10.0.0.1:8012", nil)
1284+
tracker.tryDrain()
1285+
1286+
// Should not be able to reserve on draining pod
1287+
release, ok := tracker.Reserve(context.Background())
1288+
if ok {
1289+
t.Error("Expected Reserve to fail on draining pod")
1290+
}
1291+
if release != nil {
1292+
release()
1293+
}
1294+
})
1295+
1296+
t.Run("removed state blocks new reservations", func(t *testing.T) {
1297+
tracker := newPodTracker("10.0.0.1:8012", nil)
1298+
tracker.state.Store(uint32(podRemoved))
1299+
1300+
// Should not be able to reserve on removed pod
1301+
release, ok := tracker.Reserve(context.Background())
1302+
if ok {
1303+
t.Error("Expected Reserve to fail on removed pod")
1304+
}
1305+
if release != nil {
1306+
release()
1307+
}
1308+
})
1309+
}
1310+
1311+
func TestPodTrackerReferenceCouting(t *testing.T) {
1312+
t.Run("reference counting on successful reserve", func(t *testing.T) {
1313+
tracker := newPodTracker("10.0.0.1:8012", nil)
1314+
1315+
// Initial ref count should be 0
1316+
if tracker.getRefCount() != 0 {
1317+
t.Errorf("Expected initial ref count to be 0, got %d", tracker.getRefCount())
1318+
}
1319+
1320+
// Reserve should increment ref count
1321+
release, ok := tracker.Reserve(context.Background())
1322+
if !ok {
1323+
t.Fatal("Expected Reserve to succeed")
1324+
}
1325+
1326+
if tracker.getRefCount() != 1 {
1327+
t.Errorf("Expected ref count to be 1 after Reserve, got %d", tracker.getRefCount())
1328+
}
1329+
1330+
// Release should decrement ref count
1331+
release()
1332+
1333+
if tracker.getRefCount() != 0 {
1334+
t.Errorf("Expected ref count to be 0 after release, got %d", tracker.getRefCount())
1335+
}
1336+
})
1337+
1338+
t.Run("reference counting on failed reserve", func(t *testing.T) {
1339+
tracker := newPodTracker("10.0.0.1:8012", nil)
1340+
tracker.state.Store(uint32(podDraining))
1341+
1342+
// Initial ref count should be 0
1343+
if tracker.getRefCount() != 0 {
1344+
t.Errorf("Expected initial ref count to be 0, got %d", tracker.getRefCount())
1345+
}
1346+
1347+
// Reserve should fail and not increment ref count
1348+
release, ok := tracker.Reserve(context.Background())
1349+
if ok {
1350+
t.Fatal("Expected Reserve to fail on draining pod")
1351+
}
1352+
if release != nil {
1353+
release()
1354+
}
1355+
1356+
if tracker.getRefCount() != 0 {
1357+
t.Errorf("Expected ref count to remain 0 after failed Reserve, got %d", tracker.getRefCount())
1358+
}
1359+
})
1360+
1361+
t.Run("multiple concurrent reservations", func(t *testing.T) {
1362+
tracker := newPodTracker("10.0.0.1:8012", nil)
1363+
1364+
const numReservations = 10
1365+
var wg sync.WaitGroup
1366+
releases := make([]func(), numReservations)
1367+
1368+
// Make concurrent reservations
1369+
for i := 0; i < numReservations; i++ {
1370+
wg.Add(1)
1371+
go func(idx int) {
1372+
defer wg.Done()
1373+
release, ok := tracker.Reserve(context.Background())
1374+
if ok {
1375+
releases[idx] = release
1376+
}
1377+
}(i)
1378+
}
1379+
1380+
wg.Wait()
1381+
1382+
// Check ref count
1383+
expectedCount := uint64(0)
1384+
for _, release := range releases {
1385+
if release != nil {
1386+
expectedCount++
1387+
}
1388+
}
1389+
1390+
if tracker.getRefCount() != expectedCount {
1391+
t.Errorf("Expected ref count to be %d, got %d", expectedCount, tracker.getRefCount())
1392+
}
1393+
1394+
// Release all
1395+
for _, release := range releases {
1396+
if release != nil {
1397+
release()
1398+
}
1399+
}
1400+
1401+
if tracker.getRefCount() != 0 {
1402+
t.Errorf("Expected ref count to be 0 after all releases, got %d", tracker.getRefCount())
1403+
}
1404+
})
1405+
1406+
t.Run("releaseRef with zero refcount", func(t *testing.T) {
1407+
tracker := newPodTracker("10.0.0.1:8012", nil)
1408+
1409+
// Should handle gracefully (not panic)
1410+
tracker.releaseRef()
1411+
1412+
// Ref count should remain 0
1413+
if tracker.getRefCount() != 0 {
1414+
t.Errorf("Expected ref count to remain 0, got %d", tracker.getRefCount())
1415+
}
1416+
})
1417+
}
1418+
1419+
func TestPodTrackerWeightOperations(t *testing.T) {
1420+
t.Run("weight increment and decrement", func(t *testing.T) {
1421+
tracker := newPodTracker("10.0.0.1:8012", nil)
1422+
1423+
// Initial weight should be 0
1424+
if tracker.getWeight() != 0 {
1425+
t.Errorf("Expected initial weight to be 0, got %d", tracker.getWeight())
1426+
}
1427+
1428+
// Increment weight
1429+
tracker.increaseWeight()
1430+
if tracker.getWeight() != 1 {
1431+
t.Errorf("Expected weight to be 1 after increase, got %d", tracker.getWeight())
1432+
}
1433+
1434+
// Decrement weight
1435+
tracker.decreaseWeight()
1436+
if tracker.getWeight() != 0 {
1437+
t.Errorf("Expected weight to be 0 after decrease, got %d", tracker.getWeight())
1438+
}
1439+
})
1440+
1441+
t.Run("weight underflow protection", func(t *testing.T) {
1442+
tracker := newPodTracker("10.0.0.1:8012", nil)
1443+
1444+
// Decrement from 0 should not underflow
1445+
tracker.decreaseWeight()
1446+
1447+
// Weight should remain 0 (not wrap around to max uint32)
1448+
weight := tracker.getWeight()
1449+
if weight != 0 && weight != ^uint32(0) {
1450+
// Allow either 0 or max uint32 based on implementation
1451+
t.Logf("Weight after underflow: %d", weight)
1452+
}
1453+
})
1454+
}
1455+
1456+
func TestPodTrackerWithBreaker(t *testing.T) {
1457+
t.Run("capacity with breaker", func(t *testing.T) {
1458+
breaker := queue.NewBreaker(queue.BreakerParams{
1459+
QueueDepth: 10,
1460+
MaxConcurrency: 5,
1461+
InitialCapacity: 5,
1462+
})
1463+
tracker := newPodTracker("10.0.0.1:8012", breaker)
1464+
1465+
if tracker.Capacity() != 5 {
1466+
t.Errorf("Expected capacity to be 5, got %d", tracker.Capacity())
1467+
}
1468+
})
1469+
1470+
t.Run("pending with breaker", func(t *testing.T) {
1471+
breaker := queue.NewBreaker(queue.BreakerParams{
1472+
QueueDepth: 10,
1473+
MaxConcurrency: 5,
1474+
InitialCapacity: 5,
1475+
})
1476+
tracker := newPodTracker("10.0.0.1:8012", breaker)
1477+
1478+
// Initially should have 0 pending
1479+
if tracker.Pending() != 0 {
1480+
t.Errorf("Expected pending to be 0, got %d", tracker.Pending())
1481+
}
1482+
})
1483+
1484+
t.Run("in-flight with breaker", func(t *testing.T) {
1485+
breaker := queue.NewBreaker(queue.BreakerParams{
1486+
QueueDepth: 10,
1487+
MaxConcurrency: 5,
1488+
InitialCapacity: 5,
1489+
})
1490+
tracker := newPodTracker("10.0.0.1:8012", breaker)
1491+
1492+
// Initially should have 0 in-flight
1493+
if tracker.InFlight() != 0 {
1494+
t.Errorf("Expected in-flight to be 0, got %d", tracker.InFlight())
1495+
}
1496+
1497+
// Reserve should increment in-flight
1498+
ctx := context.Background()
1499+
release, ok := breaker.Reserve(ctx)
1500+
if !ok {
1501+
t.Fatal("Expected Reserve to succeed")
1502+
}
1503+
defer release()
1504+
1505+
if tracker.InFlight() != 1 {
1506+
t.Errorf("Expected in-flight to be 1, got %d", tracker.InFlight())
1507+
}
1508+
})
1509+
1510+
t.Run("update concurrency with breaker", func(t *testing.T) {
1511+
breaker := queue.NewBreaker(queue.BreakerParams{
1512+
QueueDepth: 10,
1513+
MaxConcurrency: 5,
1514+
InitialCapacity: 5,
1515+
})
1516+
tracker := newPodTracker("10.0.0.1:8012", breaker)
1517+
1518+
// Update concurrency
1519+
tracker.UpdateConcurrency(10)
1520+
1521+
// Capacity should be updated
1522+
if tracker.Capacity() != 10 {
1523+
t.Errorf("Expected capacity to be 10 after update, got %d", tracker.Capacity())
1524+
}
1525+
})
1526+
}

0 commit comments

Comments
 (0)