Skip to content

Commit 513e88c

Browse files
committed
feat(shard distributor): clean up the shard statistics
Signed-off-by: Andreas Holt <[email protected]>
1 parent f97e0cf commit 513e88c

File tree

6 files changed

+121
-0
lines changed

6 files changed

+121
-0
lines changed

common/metrics/defs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,6 +1469,7 @@ const (
14691469
ShardDistributorStoreAssignShardScope
14701470
ShardDistributorStoreAssignShardsScope
14711471
ShardDistributorStoreDeleteExecutorsScope
1472+
ShardDistributorStoreDeleteShardStatsScope
14721473
ShardDistributorStoreGetHeartbeatScope
14731474
ShardDistributorStoreGetStateScope
14741475
ShardDistributorStoreRecordHeartbeatScope

service/sharddistributor/leader/process/processor.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func (p *namespaceProcessor) runCleanupLoop(ctx context.Context) {
221221
case <-ticker.Chan():
222222
p.logger.Info("Periodic heartbeat cleanup triggered.")
223223
p.cleanupStaleExecutors(ctx)
224+
p.cleanupStaleShardStats(ctx)
224225
}
225226
}
226227
}
@@ -254,6 +255,65 @@ func (p *namespaceProcessor) cleanupStaleExecutors(ctx context.Context) {
254255
}
255256
}
256257

258+
func (p *namespaceProcessor) cleanupStaleShardStats(ctx context.Context) {
259+
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
260+
if err != nil {
261+
p.logger.Error("Failed to get state for shard stats cleanup", tag.Error(err))
262+
return
263+
}
264+
265+
activeShards := make(map[string]struct{})
266+
now := p.timeSource.Now().Unix()
267+
shardStatsTTL := int64(p.cfg.HeartbeatTTL.Seconds())
268+
269+
// 1. build set of active executors
270+
271+
// add all assigned shards from executors that are ACTIVE and not stale
272+
for executorID, assignedState := range namespaceState.ShardAssignments {
273+
executor, exists := namespaceState.Executors[executorID]
274+
if !exists {
275+
continue
276+
}
277+
278+
isActive := executor.Status == types.ExecutorStatusACTIVE
279+
isNotStale := (now - executor.LastHeartbeat) <= shardStatsTTL
280+
if isActive && isNotStale {
281+
for shardID := range assignedState.AssignedShards {
282+
activeShards[shardID] = struct{}{}
283+
}
284+
}
285+
}
286+
287+
// add all shards in ReportedShards where the status is not DONE
288+
for _, heartbeatState := range namespaceState.Executors {
289+
for shardID, shardStatusReport := range heartbeatState.ReportedShards {
290+
if shardStatusReport.Status != types.ShardStatusDONE {
291+
activeShards[shardID] = struct{}{}
292+
}
293+
}
294+
}
295+
296+
// 2. build set of stale shard stats
297+
298+
// append all shard stats that are not in the active shards set
299+
var staleShardStats []string
300+
for shardID := range namespaceState.ShardStats {
301+
if _, ok := activeShards[shardID]; !ok {
302+
staleShardStats = append(staleShardStats, shardID)
303+
}
304+
}
305+
306+
if len(staleShardStats) == 0 {
307+
return
308+
}
309+
310+
p.logger.Info("Removing stale shard stats")
311+
// Use the leader guard for the delete operation.
312+
if err := p.shardStore.DeleteShardStats(ctx, p.namespaceCfg.Name, staleShardStats, p.election.Guard()); err != nil {
313+
p.logger.Error("Failed to delete stale shard stats", tag.Error(err))
314+
}
315+
}
316+
257317
// rebalanceShards is the core logic for distributing shards among active executors.
258318
func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
259319
metricsLoopScope := p.metricsClient.Scope(metrics.ShardDistributorAssignLoopScope)

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,41 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin
580580
return nil
581581
}
582582

583+
func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error {
584+
if len(shardIDs) == 0 {
585+
return nil
586+
}
587+
var ops []clientv3.Op
588+
for _, shardID := range shardIDs {
589+
shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey)
590+
if err != nil {
591+
return fmt.Errorf("build shard statistics key: %w", err)
592+
}
593+
ops = append(ops, clientv3.OpDelete(shardStatsKey))
594+
}
595+
596+
nativeTxn := s.client.Txn(ctx)
597+
guardedTxn, err := guard(nativeTxn)
598+
599+
if err != nil {
600+
return fmt.Errorf("apply transaction guard: %w", err)
601+
}
602+
etcdGuardedTxn, ok := guardedTxn.(clientv3.Txn)
603+
if !ok {
604+
return fmt.Errorf("guard function returned invalid transaction type")
605+
}
606+
607+
etcdGuardedTxn = etcdGuardedTxn.Then(ops...)
608+
resp, err := etcdGuardedTxn.Commit()
609+
if err != nil {
610+
return fmt.Errorf("commit shard statistics deletion: %w", err)
611+
}
612+
if !resp.Succeeded {
613+
return fmt.Errorf("transaction failed, leadership may have changed")
614+
}
615+
return nil
616+
}
617+
583618
func (s *executorStoreImpl) GetShardOwner(ctx context.Context, namespace, shardID string) (string, error) {
584619
return s.shardCache.GetShardOwner(ctx, namespace, shardID)
585620
}

service/sharddistributor/store/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type Store interface {
6060
AssignShards(ctx context.Context, namespace string, request AssignShardsRequest, guard GuardFunc) error
6161
Subscribe(ctx context.Context, namespace string) (<-chan int64, error)
6262
DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard GuardFunc) error
63+
DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard GuardFunc) error
6364

6465
GetShardOwner(ctx context.Context, namespace, shardID string) (string, error)
6566
AssignShard(ctx context.Context, namespace, shardID, executorID string) error

service/sharddistributor/store/store_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/store/wrappers/metered/store_generated.go

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)