Skip to content

Commit 1fdf161

Browse files
committed
perf: have the changes functions return an iter
This should reduce memory usage a bit when reading the changes, as it can compute one-at-a-time
1 parent 7491a82 commit 1fdf161

File tree

7 files changed

+438
-151
lines changed

7 files changed

+438
-151
lines changed

internal/datastore/common/changes.go

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"context"
5+
"iter"
56
"maps"
67
"reflect"
78
"slices"
@@ -290,71 +291,80 @@ func (ch *Changes[R, K]) AddChangedDefinition(
290291

291292
// AsRevisionChanges returns the list of changes processed so far as a datastore watch
292293
// compatible, ordered, changelist.
293-
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) ([]datastore.RevisionChanges, error) {
294+
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) iter.Seq2[datastore.RevisionChanges, error] {
294295
return ch.revisionChanges(lessThanFunc, *new(R), false)
295296
}
296297

297298
// FilterAndRemoveRevisionChanges filters a list of changes processed up to the bound revision from the changes list, removing them
298299
// and returning the filtered changes.
299-
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) {
300-
changes, err := ch.revisionChanges(lessThanFunc, boundRev, true)
301-
if err != nil {
302-
return nil, err
303-
}
300+
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error] {
301+
return func(yield func(datastore.RevisionChanges, error) bool) {
302+
for change, err := range ch.revisionChanges(lessThanFunc, boundRev, true) {
303+
if !yield(change, err) {
304+
break
305+
}
306+
}
304307

305-
ch.removeAllChangesBefore(boundRev)
306-
return changes, nil
308+
ch.removeAllChangesBefore(boundRev)
309+
}
307310
}
308311

309-
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) ([]datastore.RevisionChanges, error) {
310-
if ch.IsEmpty() {
311-
return nil, nil
312-
}
312+
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) iter.Seq2[datastore.RevisionChanges, error] {
313+
return func(yield func(datastore.RevisionChanges, error) bool) {
314+
if ch.IsEmpty() {
315+
return
316+
}
313317

314-
revisionsWithChanges := make([]K, 0, len(ch.records))
315-
for rk, cr := range ch.records {
316-
if !withBound || boundRev.GreaterThan(cr.rev) {
317-
revisionsWithChanges = append(revisionsWithChanges, rk)
318+
revisionsWithChanges := make([]K, 0, len(ch.records))
319+
for rk, cr := range ch.records {
320+
if !withBound || boundRev.GreaterThan(cr.rev) {
321+
revisionsWithChanges = append(revisionsWithChanges, rk)
322+
}
318323
}
319-
}
320324

321-
if len(revisionsWithChanges) == 0 {
322-
return nil, nil
323-
}
325+
if len(revisionsWithChanges) == 0 {
326+
return
327+
}
324328

325-
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
326-
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
327-
})
329+
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
330+
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
331+
})
328332

329-
changes := make([]datastore.RevisionChanges, len(revisionsWithChanges))
330-
for i, k := range revisionsWithChanges {
331-
revisionChangeRecord := ch.records[k]
332-
changes[i].Revision = revisionChangeRecord.rev
333-
for _, rel := range revisionChangeRecord.relTouches {
334-
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Touch(rel))
335-
}
336-
for _, rel := range revisionChangeRecord.relDeletes {
337-
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Delete(rel))
338-
}
339-
changes[i].ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
340-
changes[i].DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
341-
changes[i].DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))
342-
343-
if len(revisionChangeRecord.metadatas) > 0 {
344-
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
345-
for _, metadata := range revisionChangeRecord.metadatas {
346-
structpbMetadata, err := structpb.NewStruct(metadata)
347-
if err != nil {
348-
return nil, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
333+
for _, k := range revisionsWithChanges {
334+
revisionChangeRecord := ch.records[k]
335+
change := datastore.RevisionChanges{
336+
Revision: revisionChangeRecord.rev,
337+
}
338+
339+
for _, rel := range revisionChangeRecord.relTouches {
340+
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Touch(rel))
341+
}
342+
for _, rel := range revisionChangeRecord.relDeletes {
343+
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Delete(rel))
344+
}
345+
change.ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
346+
change.DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
347+
change.DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))
348+
349+
if len(revisionChangeRecord.metadatas) > 0 {
350+
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
351+
for _, metadata := range revisionChangeRecord.metadatas {
352+
structpbMetadata, err := structpb.NewStruct(metadata)
353+
if err != nil {
354+
_ = yield(datastore.RevisionChanges{}, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err))
355+
return
356+
}
357+
metadatas = append(metadatas, structpbMetadata)
349358
}
350-
metadatas = append(metadatas, structpbMetadata)
359+
360+
change.Metadatas = metadatas
351361
}
352362

353-
changes[i].Metadatas = metadatas
363+
if !yield(change, nil) {
364+
break
365+
}
354366
}
355367
}
356-
357-
return changes, nil
358368
}
359369

360370
func (ch *Changes[R, K]) removeAllChangesBefore(boundRev R) {

0 commit comments

Comments
 (0)