Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions storage/store/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,8 @@ func (c *Collections) BatchStoreAndIndexByTransaction(lctx lockctx.Proof, collec
light := collection.Light()
collectionID := light.ID()

err := operation.UpsertCollection(rw.Writer(), light)
if err != nil {
return nil, fmt.Errorf("could not insert collection: %w", err)
}

// First, check if all transactions are already indexed and consistent
someTransactionIndexed := false
for _, txID := range light.Transactions {
var differentColTxIsIn flow.Identifier
// The following is not BFT, because we can't handle the case where a transaction is included
Expand All @@ -205,6 +202,20 @@ func (c *Collections) BatchStoreAndIndexByTransaction(lctx lockctx.Proof, collec
if err != nil {
return nil, fmt.Errorf("could not insert transaction ID: %w", err)
}
someTransactionIndexed = true
}

if !someTransactionIndexed {
// All transactions are already indexed and point to this collection.
// Since the index is always added along with the collection and transactions,
// this means the collection and its transactions have already been stored.
// Abort early to avoid redundant database writes.
return light, nil
}

err := operation.UpsertCollection(rw.Writer(), light)
if err != nil {
return nil, fmt.Errorf("could not insert collection: %w", err)
}

// Store individual transactions
Expand Down
61 changes: 61 additions & 0 deletions storage/store/collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,64 @@ func TestCollections_ConcurrentIndexByTx(t *testing.T) {
assert.True(t, indexedCollection.ID() == col1.ID() || indexedCollection.ID() == col2.ID(), "Expected one of the collections to be indexed")
})
}

// TestCollections_BatchStoreAndIndexByTransaction_EarlyAbort verifies that
// BatchStoreAndIndexByTransaction aborts early when all transactions are already
// indexed and point to the same collection, avoiding redundant database writes.
func TestCollections_BatchStoreAndIndexByTransaction_EarlyAbort(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
lockManager := storage.NewTestingLockManager()
metrics := metrics.NewNoopCollector()
transactions := store.NewTransactions(metrics, db)
collections := store.NewCollections(db, transactions)

// Create a collection with multiple transactions
collection := unittest.CollectionFixture(3)
expectedLight := collection.Light()

// First, store the collection and index it by transaction
err := unittest.WithLock(t, lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
_, err := collections.BatchStoreAndIndexByTransaction(lctx, &collection, rw)
return err
})
})
require.NoError(t, err)

// Verify the collection was stored
actualLight, err := collections.LightByID(collection.ID())
require.NoError(t, err)
assert.Equal(t, expectedLight, actualLight)

// Verify all transactions are indexed
for _, tx := range collection.Transactions {
collLight, err := collections.LightByTransactionID(tx.ID())
require.NoError(t, err)
assert.Equal(t, collection.ID(), collLight.ID())
}

// Try to store the same collection again - should abort early
err = unittest.WithLock(t, lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
light, err := collections.BatchStoreAndIndexByTransaction(lctx, &collection, rw)
require.NoError(t, err)
// Should return the light collection without error
assert.Equal(t, expectedLight, light)
return err
})
})
require.NoError(t, err)

// Verify the collection still exists and is unchanged
actualLight, err = collections.LightByID(collection.ID())
require.NoError(t, err)
assert.Equal(t, expectedLight, actualLight)

// Verify all transactions are still indexed correctly
for _, tx := range collection.Transactions {
collLight, err := collections.LightByTransactionID(tx.ID())
require.NoError(t, err)
assert.Equal(t, collection.ID(), collLight.ID())
}
})
}
Loading