Skip to content

Commit d5a13d9

Browse files
committed
feat(shard distributor): retain shard stats while shards are within heartbeat TTL
Signed-off-by: Andreas Holt <[email protected]>
1 parent 0332fe5 commit d5a13d9

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

service/sharddistributor/leader/process/processor.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,19 @@ func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context) {
297297

298298
// append all shard stats that are not in the active shards set
299299
var staleShardStats []string
300-
for shardID := range namespaceState.ShardStats {
301-
if _, ok := activeShards[shardID]; !ok {
302-
staleShardStats = append(staleShardStats, shardID)
300+
for shardID, stats := range namespaceState.ShardStats {
301+
if _, ok := activeShards[shardID]; ok {
302+
continue
303+
}
304+
recentUpdate := stats.LastUpdateTime > 0 && (now-stats.LastUpdateTime) <= shardStatsTTL
305+
recentMove := stats.LastMoveTime > 0 && (now-stats.LastMoveTime) <= shardStatsTTL
306+
if recentUpdate || recentMove {
307+
// Preserve stats that have been updated recently to allow cooldown/load history to
308+
// survive executor churn. These shards are likely awaiting reassignment,
309+
// so we don't want to delete them.
310+
continue
303311
}
312+
staleShardStats = append(staleShardStats, shardID)
304313
}
305314

306315
if len(staleShardStats) == 0 {

service/sharddistributor/leader/process/processor_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func TestCleanupStaleShardStats(t *testing.T) {
215215
shardStats := map[string]store.ShardStatistics{
216216
"shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
217217
"shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
218-
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
218+
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second).Unix(), LastMoveTime: now.Add(-2 * time.Second).Unix()},
219219
}
220220

221221
namespaceState := &store.NamespaceState{
@@ -232,6 +232,30 @@ func TestCleanupStaleShardStats(t *testing.T) {
232232
processor.cleanupStaleShardStats(context.Background())
233233
})
234234

235+
t.Run("recent shard stats are preserved", func(t *testing.T) {
236+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
237+
defer mocks.ctrl.Finish()
238+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
239+
240+
now := mocks.timeSource.Now()
241+
242+
expiredExecutor := now.Add(-2 * time.Second).Unix()
243+
state := &store.NamespaceState{
244+
Executors: map[string]store.HeartbeatState{
245+
"exec-stale": {LastHeartbeat: expiredExecutor},
246+
},
247+
ShardAssignments: map[string]store.AssignedState{},
248+
ShardStats: map[string]store.ShardStatistics{
249+
"shard-1": {SmoothedLoad: 5.0, LastUpdateTime: now.Unix(), LastMoveTime: now.Unix()},
250+
},
251+
}
252+
253+
mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(state, nil)
254+
processor.cleanupStaleShardStats(context.Background())
255+
256+
// No delete expected since stats are recent.
257+
})
258+
235259
}
236260

237261
func TestRebalance_StoreErrors(t *testing.T) {

0 commit comments

Comments
 (0)