Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 57 additions & 47 deletions internal/datastore/common/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"iter"
"maps"
"reflect"
"slices"
Expand Down Expand Up @@ -290,71 +291,80 @@

// AsRevisionChanges returns the list of changes processed so far as a datastore watch
// compatible, ordered, changelist.
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) ([]datastore.RevisionChanges, error) {
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) iter.Seq2[datastore.RevisionChanges, error] {
return ch.revisionChanges(lessThanFunc, *new(R), false)
}

// FilterAndRemoveRevisionChanges filters a list of changes processed up to the bound revision from the changes list, removing them
// and returning the filtered changes.
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) {
changes, err := ch.revisionChanges(lessThanFunc, boundRev, true)
if err != nil {
return nil, err
}
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error] {
return func(yield func(datastore.RevisionChanges, error) bool) {
for change, err := range ch.revisionChanges(lessThanFunc, boundRev, true) {
if !yield(change, err) {
break
}
}

ch.removeAllChangesBefore(boundRev)
return changes, nil
ch.removeAllChangesBefore(boundRev)
}
}

func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) ([]datastore.RevisionChanges, error) {
if ch.IsEmpty() {
return nil, nil
}
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) iter.Seq2[datastore.RevisionChanges, error] {
return func(yield func(datastore.RevisionChanges, error) bool) {
if ch.IsEmpty() {
return
}

revisionsWithChanges := make([]K, 0, len(ch.records))
for rk, cr := range ch.records {
if !withBound || boundRev.GreaterThan(cr.rev) {
revisionsWithChanges = append(revisionsWithChanges, rk)
revisionsWithChanges := make([]K, 0, len(ch.records))
for rk, cr := range ch.records {
if !withBound || boundRev.GreaterThan(cr.rev) {
revisionsWithChanges = append(revisionsWithChanges, rk)
}
}
}

if len(revisionsWithChanges) == 0 {
return nil, nil
}
if len(revisionsWithChanges) == 0 {
return
}

sort.Slice(revisionsWithChanges, func(i int, j int) bool {
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
})
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
return lessThanFunc(revisionsWithChanges[i], revisionsWithChanges[j])
})

changes := make([]datastore.RevisionChanges, len(revisionsWithChanges))
for i, k := range revisionsWithChanges {
revisionChangeRecord := ch.records[k]
changes[i].Revision = revisionChangeRecord.rev
for _, rel := range revisionChangeRecord.relTouches {
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Touch(rel))
}
for _, rel := range revisionChangeRecord.relDeletes {
changes[i].RelationshipChanges = append(changes[i].RelationshipChanges, tuple.Delete(rel))
}
changes[i].ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
changes[i].DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
changes[i].DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))

if len(revisionChangeRecord.metadatas) > 0 {
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
for _, metadata := range revisionChangeRecord.metadatas {
structpbMetadata, err := structpb.NewStruct(metadata)
if err != nil {
return nil, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
for _, k := range revisionsWithChanges {
revisionChangeRecord := ch.records[k]
change := datastore.RevisionChanges{
Revision: revisionChangeRecord.rev,
}

for _, rel := range revisionChangeRecord.relTouches {
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Touch(rel))
}
for _, rel := range revisionChangeRecord.relDeletes {
change.RelationshipChanges = append(change.RelationshipChanges, tuple.Delete(rel))
}
change.ChangedDefinitions = slices.Collect(maps.Values(revisionChangeRecord.definitionsChanged))
change.DeletedNamespaces = slices.Collect(maps.Keys(revisionChangeRecord.namespacesDeleted))
change.DeletedCaveats = slices.Collect(maps.Keys(revisionChangeRecord.caveatsDeleted))

if len(revisionChangeRecord.metadatas) > 0 {
metadatas := make([]*structpb.Struct, 0, len(revisionChangeRecord.metadatas))
for _, metadata := range revisionChangeRecord.metadatas {
structpbMetadata, err := structpb.NewStruct(metadata)
if err != nil {
_ = yield(datastore.RevisionChanges{}, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err))
return
}

Check warning on line 356 in internal/datastore/common/changes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/changes.go#L354-L356

Added lines #L354 - L356 were not covered by tests
metadatas = append(metadatas, structpbMetadata)
}
metadatas = append(metadatas, structpbMetadata)

change.Metadatas = metadatas
}

changes[i].Metadatas = metadatas
if !yield(change, nil) {
break
}
}
}

return changes, nil
}

func (ch *Changes[R, K]) removeAllChangesBefore(boundRev R) {
Expand Down
44 changes: 28 additions & 16 deletions internal/datastore/common/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"fmt"
"iter"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -334,7 +335,7 @@ func TestChanges(t *testing.T) {
}
}

actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
require.NoError(err)

require.Equal(
Expand Down Expand Up @@ -370,7 +371,7 @@ func TestChanges(t *testing.T) {
}
}

actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
actual, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
require.NoError(err)

require.Equal(
Expand Down Expand Up @@ -399,7 +400,7 @@ func TestAddMetadata(t *testing.T) {
require.NoError(t, err)
require.False(t, ch.IsEmpty())

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.True(t, ch.IsEmpty())
Expand Down Expand Up @@ -439,7 +440,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev1, map[string]any{"operation": "create", "user_id": "123"})
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 1, len(results[0].Metadatas))
Expand All @@ -462,7 +463,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev1, metadata3)
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 3, len(results[0].Metadatas))
Expand Down Expand Up @@ -502,7 +503,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
})
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 1, len(results[0].Metadatas))
Expand All @@ -523,7 +524,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev2, metadata2)
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3))
require.NoError(t, err)
require.Equal(t, 2, len(results))

Expand All @@ -546,7 +547,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev1, metadata2)
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 2, len(results[0].Metadatas))
Expand Down Expand Up @@ -575,7 +576,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev1, metadata5)
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 4, len(results[0].Metadatas))
Expand All @@ -600,7 +601,7 @@ func TestAddRevisionMetadataComprehensive(t *testing.T) {
err = ch.AddRevisionMetadata(ctx, rev1, metadata1)
require.NoError(t, err)

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.Equal(t, 3, len(results[0].Metadatas))
Expand Down Expand Up @@ -638,7 +639,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {

require.False(t, ch.IsEmpty())

results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
results, err := collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3))
require.NoError(t, err)
require.Equal(t, 2, len(results))
require.False(t, ch.IsEmpty())
Expand All @@ -658,7 +659,7 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
},
}, results)

remaining, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc))
require.Equal(t, 1, len(remaining))
require.NoError(t, err)

Expand All @@ -671,12 +672,12 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
},
}, remaining)

results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion)
results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion))
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.True(t, ch.IsEmpty())

results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne)
results, err = collectChanges(ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne))
require.NoError(t, err)
require.Equal(t, 0, len(results))
require.True(t, ch.IsEmpty())
Expand All @@ -700,7 +701,7 @@ func TestHLCOrdering(t *testing.T) {
err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), tuple.UpdateOperationTouch)
require.NoError(t, err)

remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc))
require.NoError(t, err)
require.Equal(t, 2, len(remaining))

Expand Down Expand Up @@ -744,7 +745,7 @@ func TestHLCSameRevision(t *testing.T) {
err = ch.AddRelationshipChange(ctx, rev0again, tuple.MustParse("document:foo#viewer@user:sarah"), tuple.UpdateOperationTouch)
require.NoError(t, err)

remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
remaining, err := collectChanges(ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc))
require.NoError(t, err)
require.Equal(t, 1, len(remaining))

Expand Down Expand Up @@ -968,3 +969,14 @@ func canonicalize(in []datastore.RevisionChanges) []datastore.RevisionChanges {

return out
}

func collectChanges(changes iter.Seq2[datastore.RevisionChanges, error]) ([]datastore.RevisionChanges, error) {
out := make([]datastore.RevisionChanges, 0, 10)
for change, err := range changes {
if err != nil {
return nil, err
}
out = append(out, change)
}
return out, nil
}
22 changes: 12 additions & 10 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"encoding/json"
"errors"
"fmt"
"iter"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -225,7 +226,7 @@

// changeTracker takes care of accumulating received from CockroachDB until a checkpoint is emitted
type changeTracker[R datastore.Revision, K comparable] interface {
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error)
FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) iter.Seq2[datastore.RevisionChanges, error]
AddRelationshipChange(ctx context.Context, rev R, rel tuple.Relationship, op tuple.UpdateOperation) error
AddChangedDefinition(ctx context.Context, rev R, def datastore.SchemaDefinition) error
AddDeletedNamespace(ctx context.Context, rev R, namespaceName string) error
Expand All @@ -244,9 +245,10 @@
sendError sendErrorFunc
}

func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) ([]datastore.RevisionChanges, error) {
// we do not accumulate in this implementation, but stream right away
return nil, nil
func (s streamingChangeProvider) FilterAndRemoveRevisionChanges(_ func(lhs revisions.HLCRevision, rhs revisions.HLCRevision) bool, _ revisions.HLCRevision) iter.Seq2[datastore.RevisionChanges, error] {
return func(yield func(datastore.RevisionChanges, error) bool) {
// Nothing to do here, as changes are sent immediately.
}
}

func (s streamingChangeProvider) AddRelationshipChange(ctx context.Context, rev revisions.HLCRevision, rel tuple.Relationship, op tuple.UpdateOperation) error {
Expand Down Expand Up @@ -371,13 +373,13 @@
return
}

filtered, err := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
if err != nil {
sendError(err)
return
}
filtered := tracked.FilterAndRemoveRevisionChanges(revisions.HLCKeyLessThanFunc, rev)
for revChange, err := range filtered {
if err != nil {
sendError(err)
return
}

Check warning on line 381 in internal/datastore/crdb/watch.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/crdb/watch.go#L379-L381

Added lines #L379 - L381 were not covered by tests

for _, revChange := range filtered {
revChange := revChange

// TODO(jschorr): Change this to a new event type if/when we decide to report these
Expand Down
34 changes: 18 additions & 16 deletions internal/datastore/memdb/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,24 +286,26 @@
}
}

var rc datastore.RevisionChanges
changes, err := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc)
if err != nil {
return datastore.NoRevision, err
}
changes := tracked.AsRevisionChanges(revisions.TimestampIDKeyLessThanFunc)
isFirstChange := true
for rc, err := range changes {
if err != nil {
return datastore.NoRevision, err
}

Check warning on line 294 in internal/datastore/memdb/memdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/memdb/memdb.go#L293-L294

Added lines #L293 - L294 were not covered by tests

if len(changes) > 1 {
return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes")
} else if len(changes) == 1 {
rc = changes[0]
}
if !isFirstChange {
return datastore.NoRevision, spiceerrors.MustBugf("unexpected MemDB transaction with multiple revision changes")
}

Check warning on line 298 in internal/datastore/memdb/memdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/memdb/memdb.go#L297-L298

Added lines #L297 - L298 were not covered by tests

change := &changelog{
revisionNanos: newRevision.TimestampNanoSec(),
changes: rc,
}
if err := tx.Insert(tableChangelog, change); err != nil {
return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err)
change := &changelog{
revisionNanos: newRevision.TimestampNanoSec(),
changes: rc,
}
if err := tx.Insert(tableChangelog, change); err != nil {
return datastore.NoRevision, fmt.Errorf("error writing changelog: %w", err)
}

Check warning on line 306 in internal/datastore/memdb/memdb.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/memdb/memdb.go#L305-L306

Added lines #L305 - L306 were not covered by tests

isFirstChange = false
}

tx.Commit()
Expand Down
Loading
Loading