Skip to content

Commit 520ee46

Browse files
committed
Refactoring: Unified approach to chunk and backup handling
1 parent f80cd1b commit 520ee46

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

cmd/pbm-agent/delete.go

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -266,35 +266,20 @@ func (a *Agent) Cleanup(ctx context.Context, d *ctrl.CleanupCmd, opid ctrl.OPID,
266266
return
267267
}
268268

269-
eg := errgroup.Group{}
270-
eg.SetLimit(runtime.NumCPU())
271-
272269
cr, err := backup.MakeCleanupInfo(ctx, a.leadConn, d.OlderThan, d.Profile)
273270
if err != nil {
274271
l.Error("make cleanup report: " + err.Error())
275272
return
276273
}
277274

278-
for _, c := range cr.Chunks {
279-
eg.Go(func() error {
280-
err := oplog.DeleteChunkData(ctx, a.leadConn, stg, c)
281-
return errors.Wrapf(err, "delete chunk %q", c.FName)
282-
})
283-
}
284-
if err := eg.Wait(); err != nil {
275+
eg := &errgroup.Group{}
276+
eg.SetLimit(runtime.NumCPU())
277+
278+
if err := a.deleteChunks(ctx, eg, stg, cr.Chunks); err != nil {
285279
l.Error(err.Error())
286280
}
287281

288-
for i := range cr.Backups {
289-
bcp := &cr.Backups[i]
290-
291-
eg.Go(func() error {
292-
l.Info("deleting backup %q (profile: %q)", bcp.Name, d.Profile)
293-
err := backup.DeleteBackupData(ctx, a.leadConn, stg, bcp.Name)
294-
return errors.Wrapf(err, "delete backup %q", bcp.Name)
295-
})
296-
}
297-
if err := eg.Wait(); err != nil {
282+
if err := a.deleteBackups(ctx, eg, stg, cr.Backups); err != nil {
298283
l.Error(err.Error())
299284
}
300285
}
@@ -316,15 +301,40 @@ func (a *Agent) deletePITRImpl(ctx context.Context, ts primitive.Timestamp) erro
316301
return errors.Wrap(err, "get storage")
317302
}
318303

319-
return a.deleteChunks(ctx, stg, r.Chunks)
304+
eg := &errgroup.Group{}
305+
eg.SetLimit(runtime.NumCPU())
306+
return a.deleteChunks(ctx, eg, stg, r.Chunks)
320307
}
321308

322-
func (a *Agent) deleteChunks(ctx context.Context, stg storage.Storage, chunks []oplog.OplogChunk) error {
323-
for _, chnk := range chunks {
324-
err := oplog.DeleteChunkData(ctx, a.leadConn, stg, chnk)
325-
if err != nil {
326-
return err
327-
}
309+
func (a *Agent) deleteChunks(
310+
ctx context.Context,
311+
eg *errgroup.Group,
312+
stg storage.Storage,
313+
chunks []oplog.OplogChunk,
314+
) error {
315+
for _, c := range chunks {
316+
eg.Go(func() error {
317+
err := oplog.DeleteChunkData(ctx, a.leadConn, stg, c)
318+
return errors.Wrapf(err, "delete chunk %q", c.FName)
319+
})
320+
}
321+
return eg.Wait()
322+
}
323+
324+
func (a *Agent) deleteBackups(
325+
ctx context.Context,
326+
eg *errgroup.Group,
327+
stg storage.Storage,
328+
backups []backup.BackupMeta,
329+
) error {
330+
l := log.LogEventFromContext(ctx)
331+
332+
for _, b := range backups {
333+
eg.Go(func() error {
334+
l.Info("deleting backup %q (profile: %q)", b.Name, b.Store.Name)
335+
err := backup.DeleteBackupData(ctx, a.leadConn, stg, b.Name)
336+
return errors.Wrapf(err, "delete backup %q", b.Name)
337+
})
328338
}
329-
return nil
339+
return eg.Wait()
330340
}

0 commit comments

Comments
 (0)