Skip to content

Commit 920f807

Browse files
committed
Guard policy drop against concurrent bindings & deduplicate usage scan
- Add a KeysWithPrefix transaction condition to masking/row policy drops so freshly added bindings abort the txn instead of leaving dangling references. - Replace the duplicated usage structs/functions with a shared collect_policy_usage, reusing the gathered table updates and avoiding the extra clones. - Introduce txn_cond_eq_keys_with_prefix helper for prefix cardinality checks we can reuse elsewhere.
1 parent fef5c57 commit 920f807

File tree

9 files changed

+340
-183
lines changed

9 files changed

+340
-183
lines changed

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 65 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
2525
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
2626
use databend_common_meta_app::id_generator::IdGenerator;
2727
use databend_common_meta_app::schema::CreateOption;
28-
use databend_common_meta_app::schema::TableId;
2928
use databend_common_meta_app::schema::TableMeta;
3029
use databend_common_meta_app::tenant::Tenant;
3130
use databend_common_meta_app::KeyWithTenant;
@@ -44,7 +43,12 @@ use crate::errors::SecurityPolicyKind;
4443
use crate::fetch_id;
4544
use crate::kv_app_error::KVAppError;
4645
use crate::kv_pb_api::KVPbApi;
46+
use crate::security_policy_usage::collect_policy_usage;
47+
use crate::security_policy_usage::PolicyBinding;
48+
use crate::security_policy_usage::PolicyDropTxnBatch;
49+
use crate::security_policy_usage::PolicyUsage;
4750
use crate::txn_backoff::txn_backoff;
51+
use crate::txn_condition_util::txn_cond_eq_keys_with_prefix;
4852
use crate::txn_condition_util::txn_cond_eq_seq;
4953
use crate::txn_core_util::send_txn;
5054
use crate::txn_core_util::txn_delete_exact;
@@ -153,7 +157,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
153157
.unwrap()
154158
.map_err(MaskingPolicyError::from)?
155159
.await;
156-
let mut txn = TxnRequest::default();
157160

158161
let res = self.get_id_and_value(name_ident).await?;
159162
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());
@@ -177,25 +180,50 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
177180

178181
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
179182

180-
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
181-
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
182-
for binding in &usage.stale_bindings {
183+
let PolicyDropTxnBatch {
184+
prefix,
185+
binding_count,
186+
bindings,
187+
table_updates,
188+
finalize_policy,
189+
} = usage.prepare_drop_batch(name_ident.tenant(), policy_id);
190+
191+
let mut txn = TxnRequest::default();
192+
txn.condition
193+
.push(txn_cond_eq_keys_with_prefix(&prefix, binding_count));
194+
195+
if finalize_policy {
196+
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
197+
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
198+
}
199+
200+
for binding in bindings {
183201
txn_delete_exact(&mut txn, &binding.ident, binding.seq);
184202
}
185-
for update in &usage.table_updates {
203+
204+
for update in table_updates {
186205
txn.condition
187206
.push(txn_cond_eq_seq(&update.table_id, update.seq));
188207
let op = txn_op_put_pb(&update.table_id, &update.meta, None)
189208
.map_err(|e| MaskingPolicyError::from(MetaError::from(e)))?;
190209
txn.if_then.push(op);
191210
}
192-
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
193-
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
211+
212+
if finalize_policy {
213+
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
214+
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
215+
}
216+
194217
let (succ, _responses) = send_txn(self, txn).await?;
195-
debug!(succ = succ;"{}", func_name!());
196218

197-
if succ {
198-
return Ok(Some((seq_id, seq_meta)));
219+
if finalize_policy {
220+
debug!(succ = succ;"{}", func_name!());
221+
if succ {
222+
return Ok(Some((seq_id, seq_meta)));
223+
}
224+
} else {
225+
debug!(succ = succ, cleanup = true;"{}", func_name!());
226+
continue;
199227
}
200228
}
201229
}
@@ -243,94 +271,42 @@ async fn clear_table_column_mask_policy(
243271
Ok(())
244272
}
245273

246-
#[derive(Default)]
247-
struct MaskPolicyUsage {
248-
active_tables: Vec<u64>,
249-
stale_bindings: Vec<BindingEntry>,
250-
table_updates: Vec<TableMetaUpdate>,
251-
}
274+
type MaskPolicyUsage = PolicyUsage<MaskPolicyTableIdIdent>;
275+
276+
impl PolicyBinding for MaskPolicyTableIdIdent {
277+
fn prefix_for(tenant: &Tenant, policy_id: u64) -> DirName<Self> {
278+
DirName::new(MaskPolicyTableIdIdent::new_generic(
279+
tenant.clone(),
280+
MaskPolicyIdTableId {
281+
policy_id,
282+
table_id: 0,
283+
},
284+
))
285+
}
252286

253-
struct BindingEntry {
254-
ident: MaskPolicyTableIdIdent,
255-
seq: u64,
256-
}
287+
fn table_id(&self) -> u64 {
288+
self.name().table_id
289+
}
257290

258-
struct TableMetaUpdate {
259-
table_id: TableId,
260-
seq: u64,
261-
meta: TableMeta,
262-
}
291+
fn remove_security_policy_from_table_meta(table_meta: &mut TableMeta, policy_id: u64) -> bool {
292+
let before = table_meta.column_mask_policy_columns_ids.len();
293+
table_meta
294+
.column_mask_policy_columns_ids
295+
.retain(|_, policy| policy.policy_id != policy_id);
263296

264-
fn strip_mask_policy_from_table_meta(table_meta: &mut TableMeta, policy_id: u64) -> bool {
265-
let mut removed = false;
266-
table_meta
267-
.column_mask_policy_columns_ids
268-
.retain(|_, policy| {
269-
let keep = policy.policy_id != policy_id;
270-
if !keep {
271-
removed = true;
272-
}
273-
keep
274-
});
297+
let removed = before != table_meta.column_mask_policy_columns_ids.len();
298+
if removed {
299+
table_meta.column_mask_policy = None;
300+
}
275301

276-
if removed && table_meta.column_mask_policy.is_some() {
277-
table_meta.column_mask_policy = None;
302+
removed
278303
}
279-
280-
removed
281304
}
282305

283306
async fn collect_mask_policy_usage(
284307
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
285308
tenant: &Tenant,
286309
policy_id: u64,
287310
) -> Result<MaskPolicyUsage, MaskingPolicyError> {
288-
let binding_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic(
289-
tenant.clone(),
290-
MaskPolicyIdTableId {
291-
policy_id,
292-
table_id: 0,
293-
},
294-
));
295-
let bindings = kv_api
296-
.list_pb_vec(&binding_prefix)
297-
.await
298-
.map_err(MaskingPolicyError::from)?;
299-
300-
let mut usage = MaskPolicyUsage::default();
301-
for (binding_ident, seqv) in bindings {
302-
let table_id = binding_ident.name().table_id;
303-
let table_key = TableId::new(table_id);
304-
match kv_api
305-
.get_pb(&table_key)
306-
.await
307-
.map_err(MaskingPolicyError::from)?
308-
{
309-
Some(mut table_meta_seqv) => {
310-
if table_meta_seqv.data.drop_on.is_none() {
311-
usage.active_tables.push(table_id);
312-
} else {
313-
if strip_mask_policy_from_table_meta(&mut table_meta_seqv.data, policy_id) {
314-
usage.table_updates.push(TableMetaUpdate {
315-
table_id: table_key,
316-
seq: table_meta_seqv.seq,
317-
meta: table_meta_seqv.data.clone(),
318-
});
319-
}
320-
usage.stale_bindings.push(BindingEntry {
321-
ident: binding_ident,
322-
seq: seqv.seq,
323-
});
324-
}
325-
}
326-
None => {
327-
usage.stale_bindings.push(BindingEntry {
328-
ident: binding_ident,
329-
seq: seqv.seq,
330-
});
331-
}
332-
}
333-
}
334-
335-
Ok(usage)
311+
collect_policy_usage(kv_api, tenant, policy_id).await
336312
}

src/meta/api/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub mod reply;
4040
mod schema_api;
4141
mod schema_api_test_suite;
4242
pub mod security_api;
43+
mod security_policy_usage;
4344
mod sequence_api;
4445
pub mod serialization_util;
4546
pub mod table_api;

0 commit comments

Comments
 (0)