Skip to content

Commit 0efebe5

Browse files
authored
feat(query): prevent dropping in-use security policies (#18918)
* feat(meta): prevent dropping in-use security policies - raise constraint errors when masking/row access policies remain bound to tables - introduce dedicated masking/row-access policy error types and surface them via APIs - extend SQL logic tests to cover drop failures for active policies * Guard policy drop against concurrent bindings & deduplicate usage scan - Add a KeysWithPrefix transaction condition to masking/row policy drops so freshly added bindings abort the txn instead of leaving dangling references. - Replace the duplicated usage structs/functions with a shared collect_policy_usage, reusing the gathered table updates and avoiding the extra clones. - Introduce txn_cond_eq_keys_with_prefix helper for prefix cardinality checks we can reuse elsewhere. * refactor security policy error * refactor * remove double braces * delete collect_mask/row_access_policy_usage * delete PolicyBindingEntry * drop policy should not unset table meta policy map * refactor: move table-policy reference management to drop/undrop table Move the table-policy reference lifecycle management from drop policy operations to drop/undrop table operations. This simplifies the drop policy logic and improves performance. Key changes: - drop_table: delete all table-policy references when marking table as dropped - undrop_table: verify policies still exist, cleanup stale references, restore valid ones - drop_policy: simplified from incremental cleanup (N/100+1 txns) to single existence check (1 txn) All operations are protected by seq conditions for concurrent safety using optimistic locking pattern. * refactor function name * de-dup * cleanup_missing_policies_on_undrop rename to restore_policy_references_on_undrop * remove useless condition and clone * optimize restore_policy_references_on_undrop
1 parent 4dfcc44 commit 0efebe5

21 files changed

+885
-101
lines changed

src/meta/api/src/data_mask_api.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use databend_common_meta_app::tenant::Tenant;
2121
use databend_common_meta_types::MetaError;
2222
use databend_common_meta_types::SeqV;
2323

24+
use crate::errors::MaskingPolicyError;
2425
use crate::kv_app_error::KVAppError;
26+
use crate::meta_txn_error::MetaTxnError;
2527

2628
#[async_trait::async_trait]
2729
pub trait DatamaskApi: Send + Sync {
@@ -35,7 +37,10 @@ pub trait DatamaskApi: Send + Sync {
3537
async fn drop_data_mask(
3638
&self,
3739
name_ident: &DataMaskNameIdent,
38-
) -> Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, KVAppError>;
40+
) -> Result<
41+
Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, MaskingPolicyError>,
42+
MetaTxnError,
43+
>;
3944

4045
async fn get_data_mask(
4146
&self,

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,30 @@ use databend_common_meta_app::data_mask::DataMaskId;
1919
use databend_common_meta_app::data_mask::DataMaskIdIdent;
2020
use databend_common_meta_app::data_mask::DataMaskNameIdent;
2121
use databend_common_meta_app::data_mask::DatamaskMeta;
22+
use databend_common_meta_app::data_mask::MaskPolicyIdTableId;
23+
use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
2224
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
2325
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
2426
use databend_common_meta_app::id_generator::IdGenerator;
2527
use databend_common_meta_app::schema::CreateOption;
2628
use databend_common_meta_app::tenant::Tenant;
2729
use databend_common_meta_app::KeyWithTenant;
2830
use databend_common_meta_kvapi::kvapi;
31+
use databend_common_meta_kvapi::kvapi::DirName;
2932
use databend_common_meta_types::MetaError;
3033
use databend_common_meta_types::SeqV;
3134
use databend_common_meta_types::TxnRequest;
3235
use fastrace::func_name;
3336
use log::debug;
3437

3538
use crate::data_mask_api::DatamaskApi;
39+
use crate::errors::MaskingPolicyError;
3640
use crate::fetch_id;
3741
use crate::kv_app_error::KVAppError;
3842
use crate::kv_pb_api::KVPbApi;
43+
use crate::meta_txn_error::MetaTxnError;
3944
use crate::txn_backoff::txn_backoff;
45+
use crate::txn_condition_util::txn_cond_eq_keys_with_prefix;
4046
use crate::txn_condition_util::txn_cond_eq_seq;
4147
use crate::txn_core_util::send_txn;
4248
use crate::txn_core_util::txn_delete_exact;
@@ -135,33 +141,62 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
135141
async fn drop_data_mask(
136142
&self,
137143
name_ident: &DataMaskNameIdent,
138-
) -> Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, KVAppError> {
144+
) -> Result<
145+
Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, MaskingPolicyError>,
146+
MetaTxnError,
147+
> {
139148
debug!(name_ident :? =(name_ident); "DatamaskApi: {}", func_name!());
140149

141150
let mut trials = txn_backoff(None, func_name!());
142151
loop {
143152
trials.next().unwrap()?.await;
144-
let mut txn = TxnRequest::default();
145153

154+
// Check if policy exists
146155
let res = self.get_id_and_value(name_ident).await?;
147-
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());
148-
149156
let Some((seq_id, seq_meta)) = res else {
150-
return Ok(None);
157+
return Ok(Ok(None));
151158
};
152159

153-
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
160+
let policy_id = *seq_id.data;
161+
let tenant = name_ident.tenant();
162+
let table_policy_ref_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic(
163+
tenant.clone(),
164+
MaskPolicyIdTableId {
165+
policy_id,
166+
table_id: 0,
167+
},
168+
));
169+
170+
// List all table-policy references
171+
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;
172+
173+
// Policy is in use - cannot drop
174+
if !table_policy_refs.is_empty() {
175+
return Ok(Err(MaskingPolicyError::policy_in_use(
176+
tenant.tenant_name().to_string(),
177+
name_ident.data_mask_name().to_string(),
178+
)));
179+
}
180+
181+
// No references - drop the policy
182+
let id_ident = seq_id.data.into_t_ident(tenant);
183+
let mut txn = TxnRequest::default();
184+
185+
// Ensure no new references were created
186+
txn.condition
187+
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));
154188

155189
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
156190
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
157-
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
191+
192+
// TODO: Tentative retention for compatibility. Can be deleted later.
158193
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
159-
let (succ, _responses) = send_txn(self, txn).await?;
160-
debug!(succ = succ;"{}", func_name!());
161194

195+
let (succ, _responses) = send_txn(self, txn).await?;
162196
if succ {
163-
return Ok(Some((seq_id, seq_meta)));
197+
return Ok(Ok(Some((seq_id, seq_meta))));
164198
}
199+
// Transaction failed, retry
165200
}
166201
}
167202

src/meta/api/src/errors.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use databend_common_exception::ErrorCode;
1616
use databend_common_meta_app::principal::AutoIncrementKey;
17-
1817
/// Table logic error, unrelated to the backend service providing Table management, or dependent component.
1918
#[derive(Clone, Debug, thiserror::Error)]
2019
pub enum TableError {
@@ -39,6 +38,56 @@ impl From<TableError> for ErrorCode {
3938
}
4039
}
4140

41+
#[derive(Clone, Debug, thiserror::Error)]
42+
pub enum MaskingPolicyError {
43+
#[error(
44+
"MASKING POLICY `{policy_name}` is still in use. Unset it from all tables before dropping."
45+
)]
46+
PolicyInUse { policy_name: String },
47+
}
48+
49+
impl MaskingPolicyError {
50+
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
51+
Self::PolicyInUse {
52+
policy_name: policy_name.into(),
53+
}
54+
}
55+
}
56+
57+
impl From<MaskingPolicyError> for ErrorCode {
58+
fn from(value: MaskingPolicyError) -> Self {
59+
let s = value.to_string();
60+
match value {
61+
MaskingPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s),
62+
}
63+
}
64+
}
65+
66+
#[derive(Clone, Debug, thiserror::Error)]
67+
pub enum RowAccessPolicyError {
68+
#[error(
69+
"ROW ACCESS POLICY `{policy_name}` is still in use. Unset it from all tables before dropping."
70+
)]
71+
PolicyInUse { policy_name: String },
72+
}
73+
74+
impl RowAccessPolicyError {
75+
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
76+
Self::PolicyInUse {
77+
policy_name: policy_name.into(),
78+
}
79+
}
80+
}
81+
82+
impl From<RowAccessPolicyError> for ErrorCode {
83+
fn from(value: RowAccessPolicyError) -> Self {
84+
let s = value.to_string();
85+
match value {
86+
RowAccessPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s),
87+
}
88+
}
89+
}
90+
4291
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
4392
pub enum AutoIncrementError {
4493
#[error("OutOfAutoIncrementRange: `{key}` while `{context}`")]

src/meta/api/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ pub use error_util::db_has_to_not_exist;
7878
pub use error_util::db_id_has_to_exist;
7979
pub use error_util::table_has_to_not_exist;
8080
pub use error_util::unknown_database_error;
81+
pub use errors::MaskingPolicyError;
82+
pub use errors::RowAccessPolicyError;
8183
pub use garbage_collection_api::GarbageCollectionApi;
8284
pub use index_api::IndexApi;
8385
// Re-export from new kv_fetch_util module for backward compatibility

src/meta/api/src/row_access_policy_api.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use databend_common_meta_app::tenant_key::errors::ExistError;
2323
use databend_common_meta_types::MetaError;
2424
use databend_common_meta_types::SeqV;
2525

26+
use crate::errors::RowAccessPolicyError;
2627
use crate::meta_txn_error::MetaTxnError;
2728

2829
#[async_trait::async_trait]
2930
pub trait RowAccessPolicyApi: Send + Sync {
30-
async fn create_row_access(
31+
async fn create_row_access_policy(
3132
&self,
3233
req: CreateRowAccessPolicyReq,
3334
) -> Result<
@@ -37,17 +38,20 @@ pub trait RowAccessPolicyApi: Send + Sync {
3738

3839
/// On success, returns the dropped id and row policy.
3940
/// Returning None, means nothing is removed.
40-
async fn drop_row_access(
41+
async fn drop_row_access_policy(
4142
&self,
4243
name_ident: &RowAccessPolicyNameIdent,
43-
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError>;
44+
) -> Result<
45+
Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, RowAccessPolicyError>,
46+
MetaTxnError,
47+
>;
4448

45-
async fn get_row_access(
49+
async fn get_row_access_policy(
4650
&self,
4751
name_ident: &RowAccessPolicyNameIdent,
4852
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaError>;
4953

50-
async fn get_row_access_by_id(
54+
async fn get_row_access_policy_by_id(
5155
&self,
5256
tenant: &Tenant,
5357
policy_id: u64,

src/meta/api/src/row_access_policy_api_impl.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,32 @@
1414

1515
use databend_common_meta_app::id_generator::IdGenerator;
1616
use databend_common_meta_app::row_access_policy::row_access_policy_name_ident;
17+
use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId;
1718
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply;
1819
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq;
1920
use databend_common_meta_app::row_access_policy::RowAccessPolicyId;
2021
use databend_common_meta_app::row_access_policy::RowAccessPolicyIdIdent;
2122
use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta;
2223
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
24+
use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent;
2325
use databend_common_meta_app::tenant::Tenant;
2426
use databend_common_meta_app::tenant_key::errors::ExistError;
2527
use databend_common_meta_app::KeyWithTenant;
2628
use databend_common_meta_kvapi::kvapi;
29+
use databend_common_meta_kvapi::kvapi::DirName;
2730
use databend_common_meta_types::MetaError;
2831
use databend_common_meta_types::SeqV;
2932
use databend_common_meta_types::TxnRequest;
3033
use fastrace::func_name;
3134
use log::debug;
3235

36+
use crate::errors::RowAccessPolicyError;
3337
use crate::fetch_id;
3438
use crate::kv_pb_api::KVPbApi;
3539
use crate::meta_txn_error::MetaTxnError;
3640
use crate::row_access_policy_api::RowAccessPolicyApi;
3741
use crate::txn_backoff::txn_backoff;
42+
use crate::txn_condition_util::txn_cond_eq_keys_with_prefix;
3843
use crate::txn_condition_util::txn_cond_eq_seq;
3944
use crate::txn_core_util::send_txn;
4045
use crate::txn_core_util::txn_delete_exact;
@@ -44,7 +49,7 @@ use crate::txn_op_builder_util::txn_op_put_pb;
4449
/// Thus every type that impl kvapi::KVApi impls RowAccessPolicyApi.
4550
#[tonic::async_trait]
4651
impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
47-
async fn create_row_access(
52+
async fn create_row_access_policy(
4853
&self,
4954
req: CreateRowAccessPolicyReq,
5055
) -> Result<
@@ -120,42 +125,66 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
120125
Ok(Ok(CreateRowAccessPolicyReply { id: *id }))
121126
}
122127

123-
async fn drop_row_access(
128+
async fn drop_row_access_policy(
124129
&self,
125130
name_ident: &RowAccessPolicyNameIdent,
126-
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError> {
131+
) -> Result<
132+
Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, RowAccessPolicyError>,
133+
MetaTxnError,
134+
> {
127135
debug!(name_ident :? =(name_ident); "RowAccessPolicyApi: {}", func_name!());
128136

129137
let mut trials = txn_backoff(None, func_name!());
130138
loop {
131139
trials.next().unwrap()?.await;
132140

133-
let mut txn = TxnRequest::default();
134-
141+
// Check if policy exists
135142
let res = self.get_id_and_value(name_ident).await?;
136-
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());
137-
138143
let Some((seq_id, seq_meta)) = res else {
139-
return Ok(None);
144+
return Ok(Ok(None));
140145
};
141146

142-
let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
147+
let policy_id = *seq_id.data;
148+
let tenant = name_ident.tenant();
149+
let table_policy_ref_prefix = DirName::new(RowAccessPolicyTableIdIdent::new_generic(
150+
tenant.clone(),
151+
RowAccessPolicyIdTableId {
152+
policy_id,
153+
table_id: 0,
154+
},
155+
));
156+
157+
// List all table-policy references
158+
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;
159+
160+
// Policy is in use - cannot drop
161+
if !table_policy_refs.is_empty() {
162+
return Ok(Err(RowAccessPolicyError::policy_in_use(
163+
tenant.tenant_name().to_string(),
164+
name_ident.row_access_name().to_string(),
165+
)));
166+
}
167+
168+
// No references - drop the policy
169+
let id_ident = seq_id.data.into_t_ident(tenant);
170+
let mut txn = TxnRequest::default();
171+
172+
// Ensure no new references were created
173+
txn.condition
174+
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));
143175

144176
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
145177
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
146178

147-
// TODO(eason): need to remove row policy from table meta
148-
149179
let (succ, _responses) = send_txn(self, txn).await?;
150-
debug!(succ = succ;"{}", func_name!());
151-
152180
if succ {
153-
return Ok(Some((seq_id, seq_meta)));
181+
return Ok(Ok(Some((seq_id, seq_meta))));
154182
}
183+
// Transaction failed, retry
155184
}
156185
}
157186

158-
async fn get_row_access(
187+
async fn get_row_access_policy(
159188
&self,
160189
name_ident: &RowAccessPolicyNameIdent,
161190
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaError> {
@@ -166,7 +195,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
166195
Ok(res)
167196
}
168197

169-
async fn get_row_access_by_id(
198+
async fn get_row_access_policy_by_id(
170199
&self,
171200
tenant: &Tenant,
172201
policy_id: u64,

0 commit comments

Comments
 (0)