Skip to content

Commit 7fbb0b6

Browse files
committed
perf: have the changes functions return an iter
This should reduce memory usage a bit when reading the changes, as it can compute one-at-a-time
1 parent 48ae087 commit 7fbb0b6

File tree

7 files changed

+158
-121
lines changed

7 files changed

+158
-121
lines changed

internal/datastore/common/changes.go

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"context"
5+
"iter"
56
"maps"
67
"reflect"
78
"slices"
@@ -290,71 +291,80 @@ func (ch *Changes[R, K]) AddChangedDefinition(
290291

291292
// AsRevisionChanges returns the list of changes processed so far as a datastore watch
292293
// compatible, ordered, changelist.
293-
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) ([]datastore.RevisionChanges, error) {
294+
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) iter.Seq2[datastore.RevisionChanges, error] {
294295
return ch.revisionChanges(lessThanFunc, *new(R), false)
295296
}
296297

297298
// FilterAndRemoveRevisionChanges filters a list of changes processed up to the bound revision from the changes list, removing them
298299
// and returning the filtered changes.
299-
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) {
300-
changes, err := ch.revisionChanges(lessThanFunc, boundRev, true)
301-
if err != nil {
302-
return nil, err
303-
}
300+
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error] {
301+
return func(yield func(datastore.RevisionChanges, error) bool) {
302+
for change, err := range ch.revisionChanges(lessThanFunc, boundRev, true) {
303+
if !yield(change, err) {
304+
break
305+
}
306+
}
304307

305-
ch.removeAllChangesBefore(boundRev)
306-
return changes, nil
308+
ch.removeAllChangesBefore(boundRev)
309+
}
307310
}
308311

309-
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) ([]datastore.RevisionChanges, error) {
310-
if ch.IsEmpty() {
311-
return nil, nil
312-
}
312+
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) iter.Seq2[datastore.RevisionChanges, error] {
313+
return func(yield func(datastore.RevisionChanges, error) bool) {
314+
if ch.IsEmpty() {
315+
return
316+
}
313317

314-
revisionsWithChanges := make([]K, 0, len(ch.records))
315-
for rk, cr := range ch.records {
316-
if !withBound || boundRev.GreaterThan(cr.rev) {
317-
revisionsWithChanges = append(revisionsWithChanges, rk)
318+
revisionsWithChanges := make([]K, 0, len(ch.records))
319+
for rk, cr := range ch.records {
320+
if !withBound || boundRev.GreaterThan(cr.rev) {
321+
revisionsWithChanges = append(revisionsWithChanges, rk)
322+
}
318323
}
319-
}
320324

321-
if len(revisionsWithChanges) == 0 {
322-
return nil, nil
323-
}
325+
if len(revisionsWithChanges) == 0 {
326+
return
327+
}
324328

325-
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
326-
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
327-
})
329+
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
330+
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
331+
})
328332

329-
changes := make([]datastore.RevisionChanges, len(revisionsWithChanges))
330-
for i, k := range revisionsWithChanges {
331-
revisionChangeRecord := ch.records[k]
332-
changes[i].Revision = revisionChangeRecord.rev
333-
for _, rel := range revisionChangeRecord.relTouches {
334-
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Touch(rel))
335-
}
336-
for _, rel := range revisionChangeRecord.relDeletes {
337-
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Delete(rel))
338-
}
339-
changes[i].ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
340-
changes[i].DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
341-
changes[i].DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))
342-
343-
if len(revisionChangeRecord.metadatas) > 0 {
344-
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
345-
for _, metadata := range revisionChangeRecord.metadatas {
346-
structpbMetadata, err := structpb.NewStruct(metadata)
347-
if err != nil {
348-
return nil, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
333+
for _, k := range revisionsWithChanges {
334+
revisionChangeRecord := ch.records[k]
335+
change := datastore.RevisionChanges{
336+
Revision: revisionChangeRecord.rev,
337+
}
338+
339+
for _, rel := range revisionChangeRecord.relTouches {
340+
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Touch(rel))
341+
}
342+
for _, rel := range revisionChangeRecord.relDeletes {
343+
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Delete(rel))
344+
}
345+
change.ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
346+
change.DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
347+
change.DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))
348+
349+
if len(revisionChangeRecord.metadatas) > 0 {
350+
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
351+
for _, metadata := range revisionChangeRecord.metadatas {
352+
structpbMetadata, err := structpb.NewStruct(metadata)
353+
if err != nil {
354+
_ = yield(datastore.RevisionChanges{}, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err))
355+
return
356+
}
357+
metadatas = append(metadatas, structpbMetadata)
349358
}
350-
metadatas = append(metadatas, structpbMetadata)
359+
360+
change.Metadatas = metadatas
351361
}
352362

353-
changes[i].Metadatas = metadatas
363+
if !yield(change, nil) {
364+
break
365+
}
354366
}
355367
}
356-
357-
return changes, nil
358368
}
359369

360370
func (ch *Changes[R, K]) removeAllChangesBefore(boundRev R) {

internal/datastore/common/changes_test.go

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"fmt"
5+
"iter"
56
"slices"
67
"sort"
78
"strings"
@@ -334,7 +335,7 @@ func TestChanges(t *testing.T) {
334335
}
335336
}
336337

337-
actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
338+
actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
338339
require.NoError(err)
339340

340341
require.Equal(
@@ -370,7 +371,7 @@ func TestChanges(t *testing.T) {
370371
}
371372
}
372373

373-
actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
374+
actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
374375
require.NoError(err)
375376

376377
require.Equal(
@@ -399,7 +400,7 @@ func TestAddMetadata(t *testing.T) {
399400
require.NoError(t, err)
400401
require.False(t, ch.IsEmpty())
401402

402-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
403+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
403404
require.NoError(t, err)
404405
require.Equal(t, 1, len(results))
405406
require.True(t, ch.IsEmpty())
@@ -439,7 +440,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
439440
err = ch.AddRevisionMetadata(ctx, rev1, map[string]any{"operation": "create", "user_id": "123"})
440441
require.NoError(t, err)
441442

442-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
443+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
443444
require.NoError(t, err)
444445
require.Equal(t, 1, len(results))
445446
require.Equal(t, 1, len(results[0].Metadatas))
@@ -462,7 +463,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
462463
err = ch.AddRevisionMetadata(ctx, rev1, metadata3)
463464
require.NoError(t, err)
464465

465-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
466+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
466467
require.NoError(t, err)
467468
require.Equal(t, 1, len(results))
468469
require.Equal(t, 3, len(results[0].Metadatas))
@@ -502,7 +503,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
502503
})
503504
require.NoError(t, err)
504505

505-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
506+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
506507
require.NoError(t, err)
507508
require.Equal(t, 1, len(results))
508509
require.Equal(t, 1, len(results[0].Metadatas))
@@ -523,7 +524,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
523524
err = ch.AddRevisionMetadata(ctx, rev2, metadata2)
524525
require.NoError(t, err)
525526

526-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
527+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3))
527528
require.NoError(t, err)
528529
require.Equal(t, 2, len(results))
529530

@@ -546,7 +547,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
546547
err = ch.AddRevisionMetadata(ctx, rev1, metadata2)
547548
require.NoError(t, err)
548549

549-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
550+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
550551
require.NoError(t, err)
551552
require.Equal(t, 1, len(results))
552553
require.Equal(t, 2, len(results[0].Metadatas))
@@ -575,7 +576,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
575576
err = ch.AddRevisionMetadata(ctx, rev1, metadata5)
576577
require.NoError(t, err)
577578

578-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
579+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
579580
require.NoError(t, err)
580581
require.Equal(t, 1, len(results))
581582
require.Equal(t, 4, len(results[0].Metadatas))
@@ -600,7 +601,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
600601
err = ch.AddRevisionMetadata(ctx, rev1, metadata1)
601602
require.NoError(t, err)
602603

603-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
604+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
604605
require.NoError(t, err)
605606
require.Equal(t, 1, len(results))
606607
require.Equal(t, 3, len(results[0].Metadatas))
@@ -638,7 +639,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
638639

639640
require.False(t, ch.IsEmpty())
640641

641-
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
642+
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3))
642643
require.NoError(t, err)
643644
require.Equal(t, 2, len(results))
644645
require.False(t, ch.IsEmpty())
@@ -658,7 +659,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
658659
},
659660
}, results)
660661

661-
remaining, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
662+
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
662663
require.Equal(t, 1, len(remaining))
663664
require.NoError(t, err)
664665

@@ -671,12 +672,12 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
671672
},
672673
}, remaining)
673674

674-
results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion)
675+
results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion))
675676
require.NoError(t, err)
676677
require.Equal(t, 1, len(results))
677678
require.True(t, ch.IsEmpty())
678679

679-
results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne)
680+
results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne))
680681
require.NoError(t, err)
681682
require.Equal(t, 0, len(results))
682683
require.True(t, ch.IsEmpty())
@@ -700,7 +701,7 @@ func TestHLCOrdering(t *testing.T) {
700701
err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), tuple.UpdateOperationTouch)
701702
require.NoError(t, err)
702703

703-
remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
704+
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc))
704705
require.NoError(t, err)
705706
require.Equal(t, 2, len(remaining))
706707

@@ -744,7 +745,7 @@ func TestHLCSameRevision(t *testing.T) {
744745
err = ch.AddRelationshipChange(ctx, rev0again, tuple.MustParse("document:foo#viewer@user:sarah"), tuple.UpdateOperationTouch)
745746
require.NoError(t, err)
746747

747-
remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
748+
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc))
748749
require.NoError(t, err)
749750
require.Equal(t, 1, len(remaining))
750751

@@ -968,3 +969,14 @@ func canonicalize(in []datastore.RevisionChanges) []datastore.RevisionChanges {
968969

969970
return out
970971
}
972+
973+
func collectChanges(changes iter.Seq2[datastore.RevisionChanges, error]) ([]datastore.RevisionChanges, error) {
974+
out := make([]datastore.RevisionChanges, 0, 10)
975+
for change, err := range changes {
976+
if err != nil {
977+
return nil, err
978+
}
979+
out = append(out, change)
980+
}
981+
return out, nil
982+
}

internal/datastore/crdb/watch.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"iter"
910
"strconv"
1011
"strings"
1112
"time"
@@ -225,7 +226,7 @@ func (cds *crdbDatastore) watch(
225226

226227
// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted
227228
type changeTracker[R datastore.Revision, K comparable] interface {
228-
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error)
229+
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error]
229230
AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error
230231
AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error
231232
AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error
@@ -244,9 +245,10 @@ type streamingChangeProvider struct {
244245
sendError sendErrorFunc
245246
}
246247

247-
func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) {
248-
// we do not accumulate in this implementation, but stream right away
249-
return nil, nil
248+
func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) iter.Seq2[datastore.RevisionChanges, error] {
249+
return func(yield func(datastore.RevisionChanges, error) bool) {
250+
// Nothing to do here, as changes are sent immediately.
251+
}
250252
}
251253

252254
func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error {
@@ -371,13 +373,13 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows,
371373
return
372374
}
373375

374-
filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
375-
if err != nil {
376-
sendError(err)
377-
return
378-
}
376+
filtered := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
377+
for revChange, err := range filtered {
378+
if err != nil {
379+
sendError(err)
380+
return
381+
}
379382

380-
for _, revChange := range filtered {
381383
revChange := revChange
382384

383385
// TODO(jschorr): Change this to a new event type if/when we decide to report these

internal/datastore/memdb/memdb.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -286,24 +286,26 @@ func (mdb *memdbDatastore) ReadWriteTx(
286286
}
287287
}
288288

289-
var rc datastore.RevisionChanges
290-
changes, err := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc)
291-
if err != nil {
292-
return datastore.NoRevision, err
293-
}
289+
changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc)
290+
isFirstChange := true
291+
for rc, err := range changes {
292+
if err != nil {
293+
return datastore.NoRevision, err
294+
}
294295

295-
if len(changes) > 1 {
296-
return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes")
297-
} else if len(changes) == 1 {
298-
rc = changes[0]
299-
}
296+
if !isFirstChange {
297+
return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes")
298+
}
300299

301-
change := &changelog{
302-
revisionNanos: newRevision.TimestampNanoSec(),
303-
changes: rc,
304-
}
305-
if err := tx.Insert(tableChangelog, change); err != nil {
306-
return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err)
300+
change := &changelog{
301+
revisionNanos: newRevision.TimestampNanoSec(),
302+
changes: rc,
303+
}
304+
if err := tx.Insert(tableChangelog, change); err != nil {
305+
return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err)
306+
}
307+
308+
isFirstChange = false
307309
}
308310

309311
tx.Commit()

0 commit comments

Comments
 (0)