Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 5698e05

Browse files
committed
fix: Break the tenant reload transaction further
1 parent 5ef2c29 commit 5698e05

File tree

6 files changed

+53
-31
lines changed

6 files changed

+53
-31
lines changed

server/metadata/tenant.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (m *TenantManager) GetTenant(ctx context.Context, namespaceId string) (*Ten
379379
namespace := NewTenantNamespace(namespaceId, metadata)
380380
tenant = NewTenant(namespace, m.kvStore, m.searchStore,
381381
m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
382-
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot); err != nil {
382+
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr); err != nil {
383383
return nil, err
384384
}
385385

@@ -464,7 +464,7 @@ func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transa
464464

465465
tenant := NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
466466
tenant.Lock()
467-
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
467+
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
468468
tenant.Unlock()
469469
return tenant, err
470470
}
@@ -584,7 +584,7 @@ func (m *TenantManager) reload(ctx context.Context, currentVersion Version) erro
584584
return err
585585
}
586586
tenant.Lock()
587-
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
587+
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
588588
tenant.Unlock()
589589
if err != nil {
590590
log.Err(err).Msgf("reloading a tenant failed '%s'", tenant.name)
@@ -650,7 +650,7 @@ func NewTenant(namespace Namespace, kvStore kv.TxStore, searchStore search.Store
650650
// thread will actually perform reload. This is a blocking API which means if most of the requests detected that the
651651
// tenant state is stale then they all will block till one of them will reload the tenant state from the database. All
652652
// the blocking transactions will be restarted to ensure they see the latest view of the tenant.
653-
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
653+
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
654654
if !tenant.shouldReload(version) {
655655
return nil
656656
}
@@ -662,7 +662,7 @@ func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Ver
662662
return nil
663663
}
664664

665-
return tenant.reload(ctx, tx, version, searchSchemasSnapshot)
665+
return tenant.reload(ctx, tx, version, searchSchemasSnapshot, txMgr)
666666
}
667667

668668
func (tenant *Tenant) shouldReload(currentVersion Version) bool {
@@ -678,7 +678,7 @@ func (tenant *Tenant) shouldReload(currentVersion Version) bool {
678678
// loads all the databases, it loads the resources for each one. Once databases are reloaded then it performs the same
679679
// logic for search indexes. Once search indexes are loaded it links back the search indexes to the Tigris Collection
680680
// if the source for these search indexes is Tigris.
681-
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
681+
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
682682
// reset
683683
tenant.projects = make(map[string]*Project)
684684
tenant.idToDatabaseMap = make(map[uint32]*Database)
@@ -714,11 +714,14 @@ func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVers
714714
tenant.idToDatabaseMap[meta.ID] = database
715715
}
716716

717+
err = tx.Commit(ctx)
718+
if err != nil {
719+
log.Fatal().Err(err).Msg("Failed to reload tenant")
720+
}
717721
// load search indexes, this is essentially loading all the search indexes created by the user and attaching it to
718722
// the project object.
719723
for _, p := range tenant.projects {
720-
var err error
721-
if p.search, err = tenant.reloadSearch(ctx, tx, p, searchSchemasSnapshot); err != nil {
724+
if p.search, err = tenant.reloadSearch(ctx, txMgr, p, searchSchemasSnapshot); err != nil {
722725
return err
723726
}
724727
for _, index := range p.search.indexes {
@@ -805,16 +808,32 @@ func (tenant *Tenant) reloadDatabase(ctx context.Context, tx transaction.Tx, dbN
805808
}
806809

807810
// reloadSearch is responsible for reloading all the search indexes inside a single project.
808-
func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
809-
projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, tx, tenant.namespace.Id(), project.Name())
811+
func (tenant *Tenant) reloadSearch(ctx context.Context, txMgr *transaction.Manager, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
812+
txToReadProjectMetadata, err := txMgr.StartTx(ctx)
813+
if err != nil {
814+
log.Fatal().Err(err).Msg("Failed to start tx to read project metadata while reloading tenant")
815+
}
816+
817+
projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, txToReadProjectMetadata, tenant.namespace.Id(), project.Name())
810818
if err != nil {
811819
return nil, errors.Internal("failed to get project metadata for project %s", project.Name())
812820
}
821+
_ = txToReadProjectMetadata.Commit(ctx)
813822

814823
searchObj := NewSearch()
815824

816-
for _, searchMD := range projMetadata.SearchMetadata {
817-
schV, err := tenant.searchSchemaStore.GetLatest(ctx, tx, tenant.namespace.Id(), project.id, searchMD.Name)
825+
var indexLevelTx transaction.Tx
826+
for i, searchMD := range projMetadata.SearchMetadata {
827+
if i%10 == 0 {
828+
if indexLevelTx != nil {
829+
_ = indexLevelTx.Commit(ctx)
830+
}
831+
indexLevelTx, err = txMgr.StartTx(ctx)
832+
if err != nil {
833+
log.Fatal().Err(err).Msg("Failed to start tx to reload indices in batch for tenant reload")
834+
}
835+
}
836+
schV, err := tenant.searchSchemaStore.GetLatest(ctx, indexLevelTx, tenant.namespace.Id(), project.id, searchMD.Name)
818837
if err != nil {
819838
return nil, err
820839
}
@@ -833,6 +852,9 @@ func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, proje
833852
searchObj.indexes[searchMD.Name] = schema.NewSearchIndex(schV.Version, searchStoreIndexName, searchFactory, fieldsInSearchStore)
834853
}
835854

855+
if indexLevelTx != nil {
856+
_ = indexLevelTx.Commit(ctx)
857+
}
836858
return searchObj, nil
837859
}
838860

server/metadata/tenant_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func TestTenantManager_CreateProjects(t *testing.T) {
224224

225225
tx, err = tm.StartTx(ctx)
226226
require.NoError(t, err)
227-
err = tenant.reload(ctx, tx, nil, nil)
227+
err = tenant.reload(ctx, tx, nil, nil, tm)
228228
require.NoError(t, err)
229229
proj1, err := tenant.GetProject(tenantProj1)
230230
require.NoError(t, err)
@@ -272,7 +272,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
272272
databases := (&Project{}).GetDatabaseWithBranches()
273273
require.Len(t, databases, 0)
274274

275-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
275+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
276276

277277
require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch1")))
278278
require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj2, NewDatabaseNameWithBranch(tenantProj2, "branch1")))
@@ -282,7 +282,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
282282
require.ErrorContains(t, tenant.CreateBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")
283283

284284
// reload again to get all the branches
285-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
285+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
286286

287287
// list all branches
288288
branches := tenant.ListDatabaseBranches(tenantProj1)
@@ -339,7 +339,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
339339
require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch2")))
340340
require.ErrorContains(t, tenant.DeleteBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")
341341

342-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
342+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
343343
require.NoError(t, tx.Commit(ctx))
344344

345345
tx, err = tm.StartTx(ctx)
@@ -402,7 +402,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
402402
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
403403
require.NoError(t, err)
404404

405-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
405+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
406406

407407
proj1, err := tenant.GetProject(tenantProj1)
408408
require.NoError(t, err)
@@ -444,7 +444,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
444444
require.NoError(t, err)
445445
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))
446446

447-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
447+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
448448

449449
proj2, err = tenant.GetProject(tenantProj2)
450450
require.NoError(t, err)
@@ -477,7 +477,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
477477
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
478478
require.NoError(t, err)
479479

480-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
480+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
481481

482482
proj1, err := tenant.GetProject(tenantProj1)
483483
require.NoError(t, err)
@@ -507,7 +507,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
507507
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
508508
require.NoError(t, err)
509509
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))
510-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
510+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
511511
require.NoError(t, tx.Commit(ctx))
512512

513513
tx, err = tm.StartTx(ctx)
@@ -549,7 +549,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
549549
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
550550
require.NoError(t, err)
551551

552-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
552+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
553553

554554
proj1, err := tenant.GetProject(tenantProj1)
555555
require.NoError(t, err)
@@ -578,7 +578,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
578578
require.NoError(t, err)
579579
require.NotNil(t, indexesInSearchStore[tenant.Encoder.EncodeSearchTableName(tenant.namespace.Id(), proj1.Id(), factory.Name)])
580580

581-
require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore))
581+
require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore, tm))
582582

583583
proj1, err = tenant.GetProject(tenantProj1)
584584
require.NoError(t, err)
@@ -615,7 +615,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
615615
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
616616
require.NoError(t, err)
617617

618-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
618+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
619619

620620
proj1, err := tenant.GetProject(tenantProj1)
621621
require.NoError(t, err)
@@ -657,7 +657,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
657657
require.NoError(t, err)
658658
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))
659659

660-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
660+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
661661

662662
proj2, err = tenant.GetProject(tenantProj2)
663663
require.NoError(t, err)
@@ -688,7 +688,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
688688
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
689689
require.NoError(t, err)
690690

691-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
691+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
692692

693693
proj1, err := tenant.GetProject(tenantProj1)
694694
require.NoError(t, err)
@@ -720,7 +720,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
720720
require.NoError(t, err)
721721
require.NoError(t, tenant.CreateCollection(ctx, tx, db1, factory))
722722

723-
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
723+
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
724724

725725
proj1, err = tenant.GetProject(tenantProj1)
726726
require.NoError(t, err)
@@ -1098,7 +1098,7 @@ func TestTenantManager_SearchDataSize(t *testing.T) {
10981098

10991099
err = tenant.CreateProject(ctx, tmTx, tenantProj2, nil)
11001100
require.NoError(t, err)
1101-
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil))
1101+
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil, tm))
11021102

11031103
// proj1
11041104
proj1, err := tenant.GetProject(tenantProj1)

server/metadata/tenant_tracker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (cacheTracker *CacheTracker) stopTracking(ctx context.Context, tenant *Tena
202202
return err
203203
}
204204

205-
if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot); err != nil {
205+
if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot, cacheTracker.txMgr); err != nil {
206206
return err
207207
}
208208

server/quota/quota_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestQuota(t *testing.T) {
7070
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
7171
require.NoError(t, err)
7272

73-
err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
73+
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
7474
require.NoError(t, err)
7575

7676
proj1, err := tenant.GetProject(projName)

server/quota/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestStorageQuota(t *testing.T) {
6161
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
6262
require.NoError(t, err)
6363

64-
err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
64+
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
6565
require.NoError(t, err)
6666

6767
proj1, err := tenant.GetProject(projName)

server/services/v1/realtime/device_session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (s *Sessions) CreateDeviceSession(ctx context.Context, conn *websocket.Conn
7373
if version, err = s.versionH.Read(ctx, tx, false); err != nil {
7474
return nil, err
7575
}
76-
if err = tenant.Reload(ctx, tx, version, nil); err != nil {
76+
if err = tenant.Reload(ctx, tx, version, nil, s.txMgr); err != nil {
7777
return nil, err
7878
}
7979

0 commit comments

Comments
 (0)