Skip to content

Commit 8d11a51

Browse files
committed
refactor: move table-policy reference management to drop/undrop table
Move the table-policy reference lifecycle management from drop policy operations to drop/undrop table operations. This simplifies the drop policy logic and improves performance. Key changes: - drop_table: delete all table-policy references when marking table as dropped - undrop_table: verify policies still exist, cleanup stale references, restore valid ones - drop_policy: simplified from incremental cleanup (N/100+1 txns) to single existence check (1 txn) All operations are protected by seq conditions for concurrent safety using optimistic locking pattern.
1 parent 94bed73 commit 8d11a51

File tree

6 files changed

+146
-232
lines changed

6 files changed

+146
-232
lines changed

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 18 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
147147
> {
148148
debug!(name_ident :? =(name_ident); "DatamaskApi: {}", func_name!());
149149

150-
const BATCH_SIZE: usize = 100;
151-
152150
let mut trials = txn_backoff(None, func_name!());
153151
loop {
154152
trials.next().unwrap()?.await;
@@ -161,86 +159,44 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
161159

162160
let policy_id = *seq_id.data;
163161
let tenant = name_ident.tenant();
164-
let binding_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic(
162+
let table_policy_ref_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic(
165163
tenant.clone(),
166164
MaskPolicyIdTableId {
167165
policy_id,
168166
table_id: 0,
169167
},
170168
));
171169

172-
// List all bindings and categorize them
173-
let bindings = self.list_pb_vec(&binding_prefix).await?;
174-
let total_binding_count = bindings.len() as u64;
175-
176-
let mut active_bindings = Vec::new();
177-
let mut stale_bindings = Vec::new();
178-
179-
for (binding_key, binding_seqv) in bindings {
180-
let table_id = binding_key.name().table_id;
181-
let table_key = databend_common_meta_app::schema::TableId::new(table_id);
182-
183-
match self.get_pb(&table_key).await? {
184-
Some(SeqV { data, .. }) if data.drop_on.is_none() => {
185-
// Active table using this policy
186-
active_bindings.push(table_id);
187-
}
188-
_ => {
189-
// Table dropped or doesn't exist - stale binding
190-
stale_bindings.push((binding_key, binding_seqv.seq));
191-
}
192-
}
193-
}
170+
// List all table-policy references
171+
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;
194172

195173
// Policy is in use - cannot drop
196-
if !active_bindings.is_empty() {
174+
if !table_policy_refs.is_empty() {
197175
return Ok(Err(MaskingPolicyError::policy_in_use(
198176
tenant.tenant_name().to_string(),
199177
name_ident.data_mask_name().to_string(),
200178
)));
201179
}
202180

203-
// Still have stale bindings - clean up a batch
204-
if !stale_bindings.is_empty() {
205-
let batch_size = stale_bindings.len().min(BATCH_SIZE);
206-
let batch: Vec<_> = stale_bindings.into_iter().take(batch_size).collect();
207-
208-
let mut txn = TxnRequest::default();
209-
txn.condition.push(txn_cond_eq_keys_with_prefix(
210-
&binding_prefix,
211-
total_binding_count,
212-
));
213-
214-
for (key, seq) in batch {
215-
txn_delete_exact(&mut txn, &key, seq);
216-
}
217-
218-
let (succ, _) = send_txn(self, txn).await?;
219-
if succ {
220-
continue; // Loop back to clean next batch or drop policy
221-
}
222-
// Transaction failed, retry from beginning
223-
} else {
224-
// All bindings cleaned up - now drop the policy itself
225-
let id_ident = seq_id.data.into_t_ident(tenant);
226-
let mut txn = TxnRequest::default();
181+
// No references - drop the policy
182+
let id_ident = seq_id.data.into_t_ident(tenant);
183+
let mut txn = TxnRequest::default();
227184

228-
// Ensure no new bindings were created
229-
txn.condition
230-
.push(txn_cond_eq_keys_with_prefix(&binding_prefix, 0));
185+
// Ensure no new references were created
186+
txn.condition
187+
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));
231188

232-
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
233-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
189+
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
190+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
234191

235-
// TODO: Tentative retention for compatibility. Can be deleted later.
236-
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
192+
// TODO: Tentative retention for compatibility. Can be deleted later.
193+
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
237194

238-
let (succ, _responses) = send_txn(self, txn).await?;
239-
if succ {
240-
return Ok(Ok(Some((seq_id, seq_meta))));
241-
}
242-
// Transaction failed, retry
195+
let (succ, _responses) = send_txn(self, txn).await?;
196+
if succ {
197+
return Ok(Ok(Some((seq_id, seq_meta))));
243198
}
199+
// Transaction failed, retry
244200
}
245201
}
246202

src/meta/api/src/row_access_policy_api_impl.rs

Lines changed: 16 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
134134
> {
135135
debug!(name_ident :? =(name_ident); "RowAccessPolicyApi: {}", func_name!());
136136

137-
const BATCH_SIZE: usize = 100;
138-
139137
let mut trials = txn_backoff(None, func_name!());
140138
loop {
141139
trials.next().unwrap()?.await;
@@ -148,83 +146,41 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
148146

149147
let policy_id = *seq_id.data;
150148
let tenant = name_ident.tenant();
151-
let binding_prefix = DirName::new(RowAccessPolicyTableIdIdent::new_generic(
149+
let table_policy_ref_prefix = DirName::new(RowAccessPolicyTableIdIdent::new_generic(
152150
tenant.clone(),
153151
RowAccessPolicyIdTableId {
154152
policy_id,
155153
table_id: 0,
156154
},
157155
));
158156

159-
// List all bindings and categorize them
160-
let bindings = self.list_pb_vec(&binding_prefix).await?;
161-
let total_binding_count = bindings.len() as u64;
162-
163-
let mut active_bindings = Vec::new();
164-
let mut stale_bindings = Vec::new();
165-
166-
for (binding_key, binding_seqv) in bindings {
167-
let table_id = binding_key.name().table_id;
168-
let table_key = databend_common_meta_app::schema::TableId::new(table_id);
169-
170-
match self.get_pb(&table_key).await? {
171-
Some(SeqV { data, .. }) if data.drop_on.is_none() => {
172-
// Active table using this policy
173-
active_bindings.push(table_id);
174-
}
175-
_ => {
176-
// Table dropped or doesn't exist - stale binding
177-
stale_bindings.push((binding_key, binding_seqv.seq));
178-
}
179-
}
180-
}
157+
// List all table-policy references
158+
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;
181159

182160
// Policy is in use - cannot drop
183-
if !active_bindings.is_empty() {
161+
if !table_policy_refs.is_empty() {
184162
return Ok(Err(RowAccessPolicyError::policy_in_use(
185163
tenant.tenant_name().to_string(),
186164
name_ident.row_access_name().to_string(),
187165
)));
188166
}
189167

190-
// Still have stale bindings - clean up a batch
191-
if !stale_bindings.is_empty() {
192-
let batch_size = stale_bindings.len().min(BATCH_SIZE);
193-
let batch: Vec<_> = stale_bindings.into_iter().take(batch_size).collect();
194-
195-
let mut txn = TxnRequest::default();
196-
txn.condition.push(txn_cond_eq_keys_with_prefix(
197-
&binding_prefix,
198-
total_binding_count,
199-
));
200-
201-
for (key, seq) in batch {
202-
txn_delete_exact(&mut txn, &key, seq);
203-
}
204-
205-
let (succ, _) = send_txn(self, txn).await?;
206-
if succ {
207-
continue; // Loop back to clean next batch or drop policy
208-
}
209-
// Transaction failed, retry from beginning
210-
} else {
211-
// All bindings cleaned up - now drop the policy itself
212-
let id_ident = seq_id.data.into_t_ident(tenant);
213-
let mut txn = TxnRequest::default();
168+
// No references - drop the policy
169+
let id_ident = seq_id.data.into_t_ident(tenant);
170+
let mut txn = TxnRequest::default();
214171

215-
// Ensure no new bindings were created
216-
txn.condition
217-
.push(txn_cond_eq_keys_with_prefix(&binding_prefix, 0));
172+
// Ensure no new references were created
173+
txn.condition
174+
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));
218175

219-
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
220-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
176+
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
177+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
221178

222-
let (succ, _responses) = send_txn(self, txn).await?;
223-
if succ {
224-
return Ok(Ok(Some((seq_id, seq_meta))));
225-
}
226-
// Transaction failed, retry
179+
let (succ, _responses) = send_txn(self, txn).await?;
180+
if succ {
181+
return Ok(Ok(Some((seq_id, seq_meta))));
227182
}
183+
// Transaction failed, retry
228184
}
229185
}
230186

0 commit comments

Comments
 (0)