Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 38 additions & 22 deletions server/metadata/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,10 @@ func (m *TenantManager) CreateOrGetTenant(ctx context.Context, namespace Namespa

defer func() {
if err == nil {
if err = tx.Commit(ctx); err == nil {
// commit succeed, so we can safely cache it now, for other workers it may happen as part of the
// first call in query lifecycle
m.tenants[namespace.StrId()] = tenant
m.idToTenantMap[namespace.Id()] = namespace.StrId()
}
// commit succeed, so we can safely cache it now, for other workers it may happen as part of the
// first call in query lifecycle
m.tenants[namespace.StrId()] = tenant
m.idToTenantMap[namespace.Id()] = namespace.StrId()
} else {
_ = tx.Rollback(ctx)
}
Expand Down Expand Up @@ -379,7 +377,7 @@ func (m *TenantManager) GetTenant(ctx context.Context, namespaceId string) (*Ten
namespace := NewTenantNamespace(namespaceId, metadata)
tenant = NewTenant(namespace, m.kvStore, m.searchStore,
m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot); err != nil {
if err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr); err != nil {
return nil, err
}

Expand Down Expand Up @@ -464,7 +462,7 @@ func (m *TenantManager) createOrGetTenantInternal(ctx context.Context, tx transa

tenant := NewTenant(namespace, m.kvStore, m.searchStore, m.metaStore, m.encoder, m.versionH, currentVersion, m.tableKeyGenerator)
tenant.Lock()
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
tenant.Unlock()
return tenant, err
}
Expand Down Expand Up @@ -584,17 +582,13 @@ func (m *TenantManager) reload(ctx context.Context, currentVersion Version) erro
return err
}
tenant.Lock()
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot)
err = tenant.reload(ctx, tx, currentVersion, m.searchSchemasSnapshot, m.txMgr)
tenant.Unlock()
if err != nil {
log.Err(err).Msgf("reloading a tenant failed '%s'", tenant.name)
_ = tx.Rollback(ctx)
return err
}
if err = tx.Commit(ctx); err != nil {
log.Err(err).Msgf("committing a reloading of tenant failed '%s'", tenant.name)
return err
}
}
return nil
}
Expand Down Expand Up @@ -650,7 +644,7 @@ func NewTenant(namespace Namespace, kvStore kv.TxStore, searchStore search.Store
// thread will actually perform reload. This is a blocking API which means if most of the requests detected that the
// tenant state is stale then they all will block till one of them will reload the tenant state from the database. All
// the blocking transactions will be restarted to ensure they see the latest view of the tenant.
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
if !tenant.shouldReload(version) {
return nil
}
Expand All @@ -662,7 +656,7 @@ func (tenant *Tenant) Reload(ctx context.Context, tx transaction.Tx, version Ver
return nil
}

return tenant.reload(ctx, tx, version, searchSchemasSnapshot)
return tenant.reload(ctx, tx, version, searchSchemasSnapshot, txMgr)
}

func (tenant *Tenant) shouldReload(currentVersion Version) bool {
Expand All @@ -678,7 +672,7 @@ func (tenant *Tenant) shouldReload(currentVersion Version) bool {
// loads all the databases, it loads the resources for each one. Once databases are reloaded then it performs the same
// logic for search indexes. Once search indexes are loaded it links back the search indexes to the Tigris Collection
// if the source for these search indexes is Tigris.
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) error {
func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVersion Version, searchSchemasSnapshot map[string]*tsApi.CollectionResponse, txMgr *transaction.Manager) error {
// reset
tenant.projects = make(map[string]*Project)
tenant.idToDatabaseMap = make(map[uint32]*Database)
Expand Down Expand Up @@ -714,11 +708,14 @@ func (tenant *Tenant) reload(ctx context.Context, tx transaction.Tx, currentVers
tenant.idToDatabaseMap[meta.ID] = database
}

err = tx.Commit(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to reload tenant")
}
// load search indexes, this is essentially loading all the search indexes created by the user and attaching it to
// the project object.
for _, p := range tenant.projects {
var err error
if p.search, err = tenant.reloadSearch(ctx, tx, p, searchSchemasSnapshot); err != nil {
if p.search, err = tenant.reloadSearch(ctx, txMgr, p, searchSchemasSnapshot); err != nil {
return err
}
for _, index := range p.search.indexes {
Expand Down Expand Up @@ -805,16 +802,32 @@ func (tenant *Tenant) reloadDatabase(ctx context.Context, tx transaction.Tx, dbN
}

// reloadSearch is responsible for reloading all the search indexes inside a single project.
func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, tx, tenant.namespace.Id(), project.Name())
func (tenant *Tenant) reloadSearch(ctx context.Context, txMgr *transaction.Manager, project *Project, searchSchemasSnapshot map[string]*tsApi.CollectionResponse) (*Search, error) {
txToReadProjectMetadata, err := txMgr.StartTx(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to start tx to read project metadata while reloading tenant")
}

projMetadata, err := tenant.namespaceStore.GetProjectMetadata(ctx, txToReadProjectMetadata, tenant.namespace.Id(), project.Name())
if err != nil {
return nil, errors.Internal("failed to get project metadata for project %s", project.Name())
}
_ = txToReadProjectMetadata.Commit(ctx)

searchObj := NewSearch()

for _, searchMD := range projMetadata.SearchMetadata {
schV, err := tenant.searchSchemaStore.GetLatest(ctx, tx, tenant.namespace.Id(), project.id, searchMD.Name)
var indexLevelTx transaction.Tx
for i, searchMD := range projMetadata.SearchMetadata {
if i%10 == 0 {
if indexLevelTx != nil {
_ = indexLevelTx.Commit(ctx)
}
indexLevelTx, err = txMgr.StartTx(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to start tx to reload indices in batch for tenant reload")
}
}
schV, err := tenant.searchSchemaStore.GetLatest(ctx, indexLevelTx, tenant.namespace.Id(), project.id, searchMD.Name)
if err != nil {
return nil, err
}
Expand All @@ -833,6 +846,9 @@ func (tenant *Tenant) reloadSearch(ctx context.Context, tx transaction.Tx, proje
searchObj.indexes[searchMD.Name] = schema.NewSearchIndex(schV.Version, searchStoreIndexName, searchFactory, fieldsInSearchStore)
}

if indexLevelTx != nil {
_ = indexLevelTx.Commit(ctx)
}
return searchObj, nil
}

Expand Down
30 changes: 15 additions & 15 deletions server/metadata/tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestTenantManager_CreateProjects(t *testing.T) {

tx, err = tm.StartTx(ctx)
require.NoError(t, err)
err = tenant.reload(ctx, tx, nil, nil)
err = tenant.reload(ctx, tx, nil, nil, tm)
require.NoError(t, err)
proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
databases := (&Project{}).GetDatabaseWithBranches()
require.Len(t, databases, 0)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch1")))
require.NoError(t, tenant.CreateBranch(ctx, tx, tenantProj2, NewDatabaseNameWithBranch(tenantProj2, "branch1")))
Expand All @@ -282,7 +282,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
require.ErrorContains(t, tenant.CreateBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")

// reload again to get all the branches
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

// list all branches
branches := tenant.ListDatabaseBranches(tenantProj1)
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestTenantManager_DatabaseBranches(t *testing.T) {
require.NoError(t, tenant.DeleteBranch(ctx, tx, tenantProj1, NewDatabaseNameWithBranch(tenantProj1, "branch2")))
require.ErrorContains(t, tenant.DeleteBranch(ctx, tx, unknownProject, NewDatabaseNameWithBranch(unknownProject, "branch1")), "project doesn't exist")

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
require.NoError(t, tx.Commit(ctx))

tx, err = tm.StartTx(ctx)
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -444,7 +444,7 @@ func TestTenantManager_CreateCollections(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj2, err = tenant.GetProject(tenantProj2)
require.NoError(t, err)
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestTenantManager_DropCollection(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))
require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))
require.NoError(t, tx.Commit(ctx))

tx, err = tm.StartTx(ctx)
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestTenantManager_SearchIndexes(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, indexesInSearchStore[tenant.Encoder.EncodeSearchTableName(tenant.namespace.Id(), proj1.Id(), factory.Name)])

require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore))
require.NoError(t, tenant.reload(ctx, tx, nil, indexesInSearchStore, tm))

proj1, err = tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -615,7 +615,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj2, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db2, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj2, err = tenant.GetProject(tenantProj2)
require.NoError(t, err)
Expand Down Expand Up @@ -688,7 +688,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
err = tenant.CreateProject(ctx, tx, tenantProj1, nil)
require.NoError(t, err)

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err := tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestTenantManager_SecondaryIndexes(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tenant.CreateCollection(ctx, tx, db1, factory))

require.NoError(t, tenant.reload(ctx, tx, nil, nil))
require.NoError(t, tenant.reload(ctx, tx, nil, nil, tm))

proj1, err = tenant.GetProject(tenantProj1)
require.NoError(t, err)
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func TestTenantManager_SearchDataSize(t *testing.T) {

err = tenant.CreateProject(ctx, tmTx, tenantProj2, nil)
require.NoError(t, err)
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil))
require.NoError(t, tenant.reload(ctx, tmTx, nil, nil, tm))

// proj1
proj1, err := tenant.GetProject(tenantProj1)
Expand Down
7 changes: 2 additions & 5 deletions server/metadata/tenant_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,8 @@ func (cacheTracker *CacheTracker) stopTracking(ctx context.Context, tenant *Tena
return err
}

if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot); err != nil {
return err
}

if err = tx.Commit(ctx); err != nil {
// tx is handled inside
if err = tenant.Reload(ctx, tx, version, cacheTracker.tenantMgr.searchSchemasSnapshot, cacheTracker.txMgr); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion server/quota/quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestQuota(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)

err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
require.NoError(t, err)

proj1, err := tenant.GetProject(projName)
Expand Down
2 changes: 1 addition & 1 deletion server/quota/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStorageQuota(t *testing.T) {
factory, err := schema.NewFactoryBuilder(true).Build("test_collection", jsSchema)
require.NoError(t, err)

err = tenant.Reload(ctx, tx, []byte("aaa"), nil)
err = tenant.Reload(ctx, tx, []byte("aaa"), nil, txMgr)
require.NoError(t, err)

proj1, err := tenant.GetProject(projName)
Expand Down
2 changes: 1 addition & 1 deletion server/services/v1/realtime/device_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *Sessions) CreateDeviceSession(ctx context.Context, conn *websocket.Conn
if version, err = s.versionH.Read(ctx, tx, false); err != nil {
return nil, err
}
if err = tenant.Reload(ctx, tx, version, nil); err != nil {
if err = tenant.Reload(ctx, tx, version, nil, s.txMgr); err != nil {
return nil, err
}

Expand Down