@@ -185,6 +185,55 @@ func TestCleanupStaleExecutors(t *testing.T) {
185185 processor .cleanupStaleExecutors (context .Background ())
186186}
187187
188+ func TestCleanupStaleShardStats (t * testing.T ) {
189+ t .Run ("stale shard stats are deleted" , func (t * testing.T ) {
190+ mocks := setupProcessorTest (t , config .NamespaceTypeFixed )
191+ defer mocks .ctrl .Finish ()
192+ processor := mocks .factory .CreateProcessor (mocks .cfg , mocks .store , mocks .election ).(* namespaceProcessor )
193+
194+ now := mocks .timeSource .Now ()
195+
196+ heartbeats := map [string ]store.HeartbeatState {
197+ "exec-active" : {LastHeartbeat : now .Unix (), Status : types .ExecutorStatusACTIVE },
198+ "exec-stale" : {LastHeartbeat : now .Add (- 2 * time .Second ).Unix ()},
199+ }
200+
201+ assignments := map [string ]store.AssignedState {
202+ "exec-active" : {
203+ AssignedShards : map [string ]* types.ShardAssignment {
204+ "shard-1" : {Status : types .AssignmentStatusREADY },
205+ "shard-2" : {Status : types .AssignmentStatusREADY },
206+ },
207+ },
208+ "exec-stale" : {
209+ AssignedShards : map [string ]* types.ShardAssignment {
210+ "shard-3" : {Status : types .AssignmentStatusREADY },
211+ },
212+ },
213+ }
214+
215+ shardStats := map [string ]store.ShardStatistics {
216+ "shard-1" : {SmoothedLoad : 1.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
217+ "shard-2" : {SmoothedLoad : 2.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
218+ "shard-3" : {SmoothedLoad : 3.0 , LastUpdateTime : now .Unix (), LastMoveTime : now .Unix ()},
219+ }
220+
221+ namespaceState := & store.NamespaceState {
222+ Executors : heartbeats ,
223+ ShardAssignments : assignments ,
224+ ShardStats : shardStats ,
225+ }
226+
227+ gomock .InOrder (
228+ mocks .store .EXPECT ().GetState (gomock .Any (), mocks .cfg .Name ).Return (namespaceState , nil ),
229+ mocks .election .EXPECT ().Guard ().Return (store .NopGuard ()),
230+ mocks .store .EXPECT ().DeleteShardStats (gomock .Any (), mocks .cfg .Name , []string {"shard-3" }, gomock .Any ()).Return (nil ),
231+ )
232+ processor .cleanupStaleShardStats (context .Background ())
233+ })
234+
235+ }
236+
188237func TestRebalance_StoreErrors (t * testing.T ) {
189238 mocks := setupProcessorTest (t , config .NamespaceTypeFixed )
190239 defer mocks .ctrl .Finish ()
0 commit comments