diff --git a/src/meta/api/src/data_mask_api.rs b/src/meta/api/src/data_mask_api.rs index 53210c4cd8982..4c78ed7ebe8bb 100644 --- a/src/meta/api/src/data_mask_api.rs +++ b/src/meta/api/src/data_mask_api.rs @@ -21,7 +21,9 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; +use crate::errors::MaskingPolicyError; use crate::kv_app_error::KVAppError; +use crate::meta_txn_error::MetaTxnError; #[async_trait::async_trait] pub trait DatamaskApi: Send + Sync { @@ -35,7 +37,10 @@ pub trait DatamaskApi: Send + Sync { async fn drop_data_mask( &self, name_ident: &DataMaskNameIdent, - ) -> Result, SeqV)>, KVAppError>; + ) -> Result< + Result, SeqV)>, MaskingPolicyError>, + MetaTxnError, + >; async fn get_data_mask( &self, diff --git a/src/meta/api/src/data_mask_api_impl.rs b/src/meta/api/src/data_mask_api_impl.rs index 07dfbd7cb9316..97ec03003560e 100644 --- a/src/meta/api/src/data_mask_api_impl.rs +++ b/src/meta/api/src/data_mask_api_impl.rs @@ -19,6 +19,8 @@ use databend_common_meta_app::data_mask::DataMaskId; use databend_common_meta_app::data_mask::DataMaskIdIdent; use databend_common_meta_app::data_mask::DataMaskNameIdent; use databend_common_meta_app::data_mask::DatamaskMeta; +use databend_common_meta_app::data_mask::MaskPolicyIdTableId; +use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent; use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent; use databend_common_meta_app::data_mask::MaskpolicyTableIdList; use databend_common_meta_app::id_generator::IdGenerator; @@ -26,6 +28,7 @@ use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnRequest; @@ -33,10 +36,13 @@ use fastrace::func_name; use log::debug; use crate::data_mask_api::DatamaskApi; +use crate::errors::MaskingPolicyError; use crate::fetch_id; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; +use crate::meta_txn_error::MetaTxnError; use crate::txn_backoff::txn_backoff; +use crate::txn_condition_util::txn_cond_eq_keys_with_prefix; use crate::txn_condition_util::txn_cond_eq_seq; use crate::txn_core_util::send_txn; use crate::txn_core_util::txn_delete_exact; @@ -135,33 +141,62 @@ impl> DatamaskApi for KV { async fn drop_data_mask( &self, name_ident: &DataMaskNameIdent, - ) -> Result, SeqV)>, KVAppError> { + ) -> Result< + Result, SeqV)>, MaskingPolicyError>, + MetaTxnError, + > { debug!(name_ident :? =(name_ident); "DatamaskApi: {}", func_name!()); let mut trials = txn_backoff(None, func_name!()); loop { trials.next().unwrap()?.await; - let mut txn = TxnRequest::default(); + // Check if policy exists let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!()); - let Some((seq_id, seq_meta)) = res else { - return Ok(None); + return Ok(Ok(None)); }; - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + let policy_id = *seq_id.data; + let tenant = name_ident.tenant(); + let table_policy_ref_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic( + tenant.clone(), + MaskPolicyIdTableId { + policy_id, + table_id: 0, + }, + )); + + // List all table-policy references + let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?; + + // Policy is in use - cannot drop + if !table_policy_refs.is_empty() { + return Ok(Err(MaskingPolicyError::policy_in_use( + tenant.tenant_name().to_string(), + name_ident.data_mask_name().to_string(), + ))); + } + + // No references - drop the policy + let id_ident = seq_id.data.into_t_ident(tenant); + let mut txn = TxnRequest::default(); + + // Ensure no new references were created + txn.condition + .push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0)); txn_delete_exact(&mut txn, name_ident, seq_id.seq); txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later + + // TODO: Tentative retention for compatibility. Can be deleted later. clear_table_column_mask_policy(self, name_ident, &mut txn).await?; - let (succ, _responses) = send_txn(self, txn).await?; - debug!(succ = succ;"{}", func_name!()); + let (succ, _responses) = send_txn(self, txn).await?; if succ { - return Ok(Some((seq_id, seq_meta))); + return Ok(Ok(Some((seq_id, seq_meta)))); } + // Transaction failed, retry } } diff --git a/src/meta/api/src/errors.rs b/src/meta/api/src/errors.rs index 6fab53337bf28..9b28636b59ebe 100644 --- a/src/meta/api/src/errors.rs +++ b/src/meta/api/src/errors.rs @@ -14,7 +14,6 @@ use databend_common_exception::ErrorCode; use databend_common_meta_app::principal::AutoIncrementKey; - /// Table logic error, unrelated to the backend service providing Table management, or dependent component. #[derive(Clone, Debug, thiserror::Error)] pub enum TableError { @@ -39,6 +38,56 @@ impl From for ErrorCode { } } +#[derive(Clone, Debug, thiserror::Error)] +pub enum MaskingPolicyError { + #[error( + "MASKING POLICY `{policy_name}` is still in use. Unset it from all tables before dropping." + )] + PolicyInUse { policy_name: String }, +} + +impl MaskingPolicyError { + pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + Self::PolicyInUse { + policy_name: policy_name.into(), + } + } +} + +impl From for ErrorCode { + fn from(value: MaskingPolicyError) -> Self { + let s = value.to_string(); + match value { + MaskingPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s), + } + } +} + +#[derive(Clone, Debug, thiserror::Error)] +pub enum RowAccessPolicyError { + #[error( + "ROW ACCESS POLICY `{policy_name}` is still in use. Unset it from all tables before dropping." + )] + PolicyInUse { policy_name: String }, +} + +impl RowAccessPolicyError { + pub fn policy_in_use(_tenant: impl Into, policy_name: impl Into) -> Self { + Self::PolicyInUse { + policy_name: policy_name.into(), + } + } +} + +impl From for ErrorCode { + fn from(value: RowAccessPolicyError) -> Self { + let s = value.to_string(); + match value { + RowAccessPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s), + } + } +} + #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum AutoIncrementError { #[error("OutOfAutoIncrementRange: `{key}` while `{context}`")] diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index b1a5c810354fe..79c5c6844647a 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -78,6 +78,8 @@ pub use error_util::db_has_to_not_exist; pub use error_util::db_id_has_to_exist; pub use error_util::table_has_to_not_exist; pub use error_util::unknown_database_error; +pub use errors::MaskingPolicyError; +pub use errors::RowAccessPolicyError; pub use garbage_collection_api::GarbageCollectionApi; pub use index_api::IndexApi; // Re-export from new kv_fetch_util module for backward compatibility diff --git a/src/meta/api/src/row_access_policy_api.rs b/src/meta/api/src/row_access_policy_api.rs index 64b7602d03334..99832878f9a24 100644 --- a/src/meta/api/src/row_access_policy_api.rs +++ b/src/meta/api/src/row_access_policy_api.rs @@ -23,11 +23,12 @@ use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; +use crate::errors::RowAccessPolicyError; use crate::meta_txn_error::MetaTxnError; #[async_trait::async_trait] pub trait RowAccessPolicyApi: Send + Sync { - async fn create_row_access( + async fn create_row_access_policy( &self, req: CreateRowAccessPolicyReq, ) -> Result< @@ -37,17 +38,20 @@ pub trait RowAccessPolicyApi: Send + Sync { /// On success, returns the dropped id and row policy. /// Returning None, means nothing is removed. - async fn drop_row_access( + async fn drop_row_access_policy( &self, name_ident: &RowAccessPolicyNameIdent, - ) -> Result, SeqV)>, MetaTxnError>; + ) -> Result< + Result, SeqV)>, RowAccessPolicyError>, + MetaTxnError, + >; - async fn get_row_access( + async fn get_row_access_policy( &self, name_ident: &RowAccessPolicyNameIdent, ) -> Result, SeqV)>, MetaError>; - async fn get_row_access_by_id( + async fn get_row_access_policy_by_id( &self, tenant: &Tenant, policy_id: u64, diff --git a/src/meta/api/src/row_access_policy_api_impl.rs b/src/meta/api/src/row_access_policy_api_impl.rs index 4eda04a3c7f6b..4c8e9aa53492a 100644 --- a/src/meta/api/src/row_access_policy_api_impl.rs +++ b/src/meta/api/src/row_access_policy_api_impl.rs @@ -14,27 +14,32 @@ use databend_common_meta_app::id_generator::IdGenerator; use databend_common_meta_app::row_access_policy::row_access_policy_name_ident; +use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply; use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq; use databend_common_meta_app::row_access_policy::RowAccessPolicyId; use databend_common_meta_app::row_access_policy::RowAccessPolicyIdIdent; use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta; use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent; +use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant_key::errors::ExistError; use databend_common_meta_app::KeyWithTenant; use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnRequest; use fastrace::func_name; use log::debug; +use crate::errors::RowAccessPolicyError; use crate::fetch_id; use crate::kv_pb_api::KVPbApi; use crate::meta_txn_error::MetaTxnError; use crate::row_access_policy_api::RowAccessPolicyApi; use crate::txn_backoff::txn_backoff; +use crate::txn_condition_util::txn_cond_eq_keys_with_prefix; use crate::txn_condition_util::txn_cond_eq_seq; use crate::txn_core_util::send_txn; use crate::txn_core_util::txn_delete_exact; @@ -44,7 +49,7 @@ use crate::txn_op_builder_util::txn_op_put_pb; /// Thus every type that impl kvapi::KVApi impls RowAccessPolicyApi. #[tonic::async_trait] impl> RowAccessPolicyApi for KV { - async fn create_row_access( + async fn create_row_access_policy( &self, req: CreateRowAccessPolicyReq, ) -> Result< @@ -120,42 +125,66 @@ impl> RowAccessPolicyApi for KV { Ok(Ok(CreateRowAccessPolicyReply { id: *id })) } - async fn drop_row_access( + async fn drop_row_access_policy( &self, name_ident: &RowAccessPolicyNameIdent, - ) -> Result, SeqV)>, MetaTxnError> { + ) -> Result< + Result, SeqV)>, RowAccessPolicyError>, + MetaTxnError, + > { debug!(name_ident :? =(name_ident); "RowAccessPolicyApi: {}", func_name!()); let mut trials = txn_backoff(None, func_name!()); loop { trials.next().unwrap()?.await; - let mut txn = TxnRequest::default(); - + // Check if policy exists let res = self.get_id_and_value(name_ident).await?; - debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!()); - let Some((seq_id, seq_meta)) = res else { - return Ok(None); + return Ok(Ok(None)); }; - let id_ident = seq_id.data.into_t_ident(name_ident.tenant()); + let policy_id = *seq_id.data; + let tenant = name_ident.tenant(); + let table_policy_ref_prefix = DirName::new(RowAccessPolicyTableIdIdent::new_generic( + tenant.clone(), + RowAccessPolicyIdTableId { + policy_id, + table_id: 0, + }, + )); + + // List all table-policy references + let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?; + + // Policy is in use - cannot drop + if !table_policy_refs.is_empty() { + return Ok(Err(RowAccessPolicyError::policy_in_use( + tenant.tenant_name().to_string(), + name_ident.row_access_name().to_string(), + ))); + } + + // No references - drop the policy + let id_ident = seq_id.data.into_t_ident(tenant); + let mut txn = TxnRequest::default(); + + // Ensure no new references were created + txn.condition + .push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0)); txn_delete_exact(&mut txn, name_ident, seq_id.seq); txn_delete_exact(&mut txn, &id_ident, seq_meta.seq); - // TODO(eason): need to remove row policy from table meta - let (succ, _responses) = send_txn(self, txn).await?; - debug!(succ = succ;"{}", func_name!()); - if succ { - return Ok(Some((seq_id, seq_meta))); + return Ok(Ok(Some((seq_id, seq_meta)))); } + // Transaction failed, retry } } - async fn get_row_access( + async fn get_row_access_policy( &self, name_ident: &RowAccessPolicyNameIdent, ) -> Result, SeqV)>, MetaError> { @@ -166,7 +195,7 @@ impl> RowAccessPolicyApi for KV { Ok(res) } - async fn get_row_access_by_id( + async fn get_row_access_policy_by_id( &self, tenant: &Tenant, policy_id: u64, diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index 9259c4d56e7d0..aed26b1ea3a73 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt::Display; use std::time::Duration; @@ -24,8 +25,18 @@ use databend_common_meta_app::app_error::UndropTableHasNoHistory; use databend_common_meta_app::app_error::UndropTableRetentionGuard; use databend_common_meta_app::app_error::UnknownTable; use databend_common_meta_app::app_error::UnknownTableId; +use databend_common_meta_app::data_mask::DataMaskId; +use databend_common_meta_app::data_mask::DataMaskIdIdent; +use databend_common_meta_app::data_mask::MaskPolicyIdTableId; +use databend_common_meta_app::data_mask::MaskPolicyTableId; +use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::principal::TenantOwnershipObjectIdent; +use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId; +use databend_common_meta_app::row_access_policy::RowAccessPolicyId; +use databend_common_meta_app::row_access_policy::RowAccessPolicyIdIdent; +use databend_common_meta_app::row_access_policy::RowAccessPolicyTableId; +use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent; use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedIndexId; use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent; use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId; @@ -51,6 +62,7 @@ use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; +use databend_common_meta_types::TxnCondition; use databend_common_meta_types::TxnOp; use databend_common_meta_types::TxnRequest; use fastrace::func_name; @@ -221,6 +233,44 @@ pub async fn construct_drop_table_txn_operations( tb_meta.drop_on = Some(Utc::now()); + // Delete table-policy references when dropping table + // + // Concurrency safety: + // - The `table_meta.seq` condition below protects against concurrent modifications + // - Using `txn_op_del` (instead of `txn_delete_exact`) is safe here because: + // 1. Any concurrent unset/modify of policies will change `table_meta.seq`, causing this txn to fail + // 2. Deleting a non-existent reference is idempotent and won't cause errors (returns success=false) + // 3. The transaction ensures atomicity - either all references are deleted or none + // + // This avoids the overhead of reading each reference's seq while maintaining correctness. + let policy_ids: HashSet = tb_meta + .column_mask_policy_columns_ids + .values() + .map(|policy_map| policy_map.policy_id) + .collect(); + + txn.if_then.extend(policy_ids.into_iter().map(|policy_id| { + txn_op_del(&MaskPolicyTableIdIdent::new_generic( + tenant.clone(), + MaskPolicyIdTableId { + policy_id, + table_id, + }, + )) + })); + + // Delete row access policy reference + if let Some(policy_map) = &tb_meta.row_access_policy_columns_ids { + txn.if_then + .push(txn_op_del(&RowAccessPolicyTableIdIdent::new_generic( + tenant.clone(), + RowAccessPolicyIdTableId { + policy_id: policy_map.policy_id, + table_id, + }, + ))); + } + // There must NOT be concurrent txn(b) that list-then-delete tables: // Otherwise, (b) may not delete all of the tables, if this txn(a) is operating on some table. // We guarantee there is no `(b)` so we do not have to assert db seq. @@ -487,6 +537,16 @@ pub async fn handle_undrop_table( } } + let tenant = tenant_dbname_tbname.tenant().clone(); + let (policy_restore_ops, policy_restore_conditions) = restore_policy_references_on_undrop( + kv_api, + &tenant, + table_id, + &mut seq_table_meta.data, + ) + .await + .map_err(KVAppError::from)?; + { // reset drop on time seq_table_meta.drop_on = None; @@ -495,26 +555,34 @@ pub async fn handle_undrop_table( let vacuum_seq = seq_vacuum_retention.as_ref().map(|sr| sr.seq).unwrap_or(0); let txn = TxnRequest::new( - vec![ - // db has not to change, i.e., no new table is created. - // Renaming db is OK and does not affect the seq of db_meta. - txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq), - // still this table id - txn_cond_eq_seq(&dbid_tbname, dbid_tbname_seq), - // table is not changed - txn_cond_eq_seq(&tbid, seq_table_meta.seq), - // Concurrent safety: vacuum timestamp seq must not change during undrop - // - If vacuum_retention exists: seq must remain the same (no update by vacuum) - // - If vacuum_retention is None: seq must remain 0 (no creation by vacuum) - txn_cond_eq_seq(&vacuum_ident, vacuum_seq), - ], - vec![ - // Changing a table in a db has to update the seq of db_meta, - // to block the batch-delete-tables when deleting a db. - txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */ - txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */ - txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */ - ], + [ + vec![ + // db has not to change, i.e., no new table is created. + // Renaming db is OK and does not affect the seq of db_meta. + txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq), + // still this table id + txn_cond_eq_seq(&dbid_tbname, dbid_tbname_seq), + // table is not changed + txn_cond_eq_seq(&tbid, seq_table_meta.seq), + // Concurrent safety: vacuum timestamp seq must not change during undrop + // - If vacuum_retention exists: seq must remain the same (no update by vacuum) + // - If vacuum_retention is None: seq must remain 0 (no creation by vacuum) + txn_cond_eq_seq(&vacuum_ident, vacuum_seq), + ], + policy_restore_conditions, + ] + .concat(), + [ + vec![ + // Changing a table in a db has to update the seq of db_meta, + // to block the batch-delete-tables when deleting a db. + txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, // (db_id) -> db_meta + txn_op_put(&dbid_tbname, serialize_u64(table_id)?), // (tenant, db_id, tb_name) -> tb_id + txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, // (tenant, db_id, tb_id) -> tb_meta + ], + policy_restore_ops, + ] + .concat(), ); let (succ, _responses) = send_txn(kv_api, txn).await?; @@ -533,6 +601,108 @@ pub async fn handle_undrop_table( } } +/// Restore policy references when undropping a table. +/// +/// This function handles two cases: +/// 1. Policy exists: Restore the table-policy reference (deleted during drop_table) +/// 2. Policy missing: Clean up the policy reference from table_meta +async fn restore_policy_references_on_undrop( + kv_api: &(impl kvapi::KVApi + ?Sized), + tenant: &Tenant, + table_id: u64, + table_meta: &mut TableMeta, +) -> Result<(Vec, Vec), MetaError> { + let mut ops = Vec::new(); + let mut conditions = Vec::new(); + + // Process masking policies + if !table_meta.column_mask_policy_columns_ids.is_empty() { + // Collect unique policy IDs (one policy can be applied to multiple columns) + let policy_ids: HashSet = table_meta + .column_mask_policy_columns_ids + .values() + .map(|policy_map| policy_map.policy_id) + .collect(); + + let mut missing_policies = HashSet::new(); + + for policy_id in policy_ids { + let policy_ident = DataMaskIdIdent::new_generic(tenant, DataMaskId::new(policy_id)); + let seq_policy = kv_api.get_pb(&policy_ident).await?; + + match seq_policy { + None => { + // Policy missing - mark for cleanup from table_meta + // Note: table-policy reference was already deleted during drop_table + missing_policies.insert(policy_id); + } + Some(seq_policy) => { + // Policy exists - restore the table-policy reference + // (it was deleted during drop_table) + let ref_key = + MaskPolicyTableIdIdent::new_generic(tenant.clone(), MaskPolicyIdTableId { + policy_id, + table_id, + }); + // Concurrent safety: ensure policy still exists when txn executes. + // Critical: if policy is dropped before txn execution, this prevents + // creating a dangling reference to a non-existent policy. + conditions.push(txn_cond_eq_seq(&policy_ident, seq_policy.seq)); + ops.push(txn_op_put_pb(&ref_key, &MaskPolicyTableId, None)?); + } + } + } + + for missing_policy_id in &missing_policies { + debug!( + "Undrop table {}: removing missing masking policy {}", + table_id, missing_policy_id + ); + } + table_meta + .column_mask_policy_columns_ids + .retain(|_, policy_map| !missing_policies.contains(&policy_map.policy_id)); + } + + // Process row access policy + if let Some(policy_map) = &table_meta.row_access_policy_columns_ids { + let policy_id = policy_map.policy_id; + let policy_ident = + RowAccessPolicyIdIdent::new_generic(tenant, RowAccessPolicyId::new(policy_id)); + let seq_policy = kv_api.get_pb(&policy_ident).await?; + + match seq_policy { + None => { + // Policy missing - clean up from table_meta + // Note: table-policy reference was already deleted during drop_table + debug!( + "Undrop table {}: removing missing row access policy {}", + table_id, policy_id + ); + table_meta.row_access_policy_columns_ids = None; + table_meta.row_access_policy = None; + } + Some(seq_policy) => { + // Policy exists - restore the table-policy reference + // (it was deleted during drop_table) + let ref_key = RowAccessPolicyTableIdIdent::new_generic( + tenant.clone(), + RowAccessPolicyIdTableId { + policy_id, + table_id, + }, + ); + // Concurrent safety: ensure policy still exists when txn executes. + // Critical: if policy is dropped before txn execution, this prevents + // creating a dangling reference to a non-existent policy. + conditions.push(txn_cond_eq_seq(&policy_ident, seq_policy.seq)); + ops.push(txn_op_put_pb(&ref_key, &RowAccessPolicyTableId, None)?); + } + } + } + Ok((ops, conditions)) +} + /// add __fd_marked_deleted_index// -> marked_deleted_index_meta pub fn mark_index_as_deleted( tenant: &Tenant, diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index fa95658916d6f..c20ffc8ae8b0f 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -315,6 +315,12 @@ impl SchemaApiTestSuite { suite.table_swap(&b.build().await).await?; suite.table_update_meta(&b.build().await).await?; suite.table_update_mask_policy(&b.build().await).await?; + suite + .mask_policy_drop_with_table_lifecycle(&b.build().await) + .await?; + suite + .row_access_policy_drop_with_table_lifecycle(&b.build().await) + .await?; suite .table_update_row_access_policy(&b.build().await) .await?; @@ -3451,13 +3457,6 @@ impl SchemaApiTestSuite { assert!(res.is_none()); } - info!("--- drop mask policy check"); - { - let name_ident = DataMaskNameIdent::new(tenant.clone(), mask_name_1); - let dropped = mt.drop_data_mask(&name_ident).await?; - assert!(dropped.is_some()); - } - info!("--- create or replace mask policy"); { let mask_name = "replace_mask"; @@ -3560,10 +3559,10 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await.unwrap().unwrap(); let name = RowAccessPolicyNameIdent::new(tenant.clone(), policy1.to_string()); - let res = mt.get_row_access(&name).await.unwrap().unwrap(); + let res = mt.get_row_access_policy(&name).await.unwrap().unwrap(); let policy1_id = res.0.data; let req = CreateRowAccessPolicyReq { @@ -3577,7 +3576,7 @@ impl SchemaApiTestSuite { update_on: None, }, }; - mt.create_row_access(req).await.unwrap().unwrap(); + mt.create_row_access_policy(req).await.unwrap().unwrap(); let table_id_1; info!("--- apply mask1 policy to table 1 and check"); @@ -3850,6 +3849,350 @@ impl SchemaApiTestSuite { Ok(()) } + #[fastrace::trace] + async fn mask_policy_drop_with_table_lifecycle< + MT: SchemaApi + DatamaskApi + kvapi::KVApi, + >( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant_name = "tenant_mask_policy_drop"; + let db_name = "db_mask_policy_drop"; + let table_name = "tb_mask_policy_drop"; + let mask_cleanup_name = "mask_cleanup_after_drop"; + let mask_guard_name = "mask_guard_while_active"; + + let tenant = Tenant::new_or_err(tenant_name, func_name!())?; + + // Prepare database and table. + let mut util = Util::new(mt, tenant_name, db_name, table_name, "FUSE"); + util.create_db().await?; + let (table_id, _) = util.create_table().await?; + + let table_name_ident = + TableNameIdent::new(tenant.clone(), db_name.to_string(), table_name.to_string()); + + let mut table_info = util.get_table().await?; + let number_column_id = table_info + .meta + .schema + .fields + .iter() + .find(|f| f.name == "number") + .unwrap() + .column_id; + + // Create a masking policy and bind it to the table. + let created_on = Utc::now(); + let mask_cleanup_ident = + DataMaskNameIdent::new(tenant.clone(), mask_cleanup_name.to_string()); + mt.create_data_mask(CreateDatamaskReq { + create_option: CreateOption::Create, + name: mask_cleanup_ident.clone(), + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: None, + create_on: created_on, + update_on: None, + }, + }) + .await?; + let mask_cleanup_id = get_kv_u64_data(mt, &mask_cleanup_ident).await?; + + let set_req = SetTableColumnMaskPolicyReq { + tenant: tenant.clone(), + seq: MatchSeq::Exact(table_info.ident.seq), + table_id, + action: SetSecurityPolicyAction::Set(mask_cleanup_id, vec![number_column_id]), + }; + mt.set_table_column_mask_policy(set_req).await?; + table_info = util.get_table().await?; + assert!(table_info + .meta + .column_mask_policy_columns_ids + .contains_key(&number_column_id)); + + // Drop the table (this deletes the table-policy reference), then drop the policy. + util.drop_table_by_id().await?; + + // Verify the reference was already deleted by drop_table + let binding_ident = + MaskPolicyTableIdIdent::new_generic(tenant.clone(), MaskPolicyIdTableId { + policy_id: mask_cleanup_id, + table_id, + }); + assert!( + mt.get_pb(&binding_ident).await?.is_none(), + "table-policy reference should be removed when dropping table" + ); + + // Now drop_policy should succeed since there are no active references + let dropped = mt.drop_data_mask(&mask_cleanup_ident).await??; + assert!(dropped.is_some()); + assert!( + mt.get_data_mask(&mask_cleanup_ident).await?.is_none(), + "policy metadata should be gone after drop" + ); + + // Undrop the table; masking policy entries should disappear. + mt.undrop_table(UndropTableReq { + name_ident: table_name_ident.clone(), + }) + .await?; + table_info = util.get_table().await?; + assert!( + table_info.meta.column_mask_policy_columns_ids.is_empty(), + "undropped table should no longer reference the dropped policy" + ); + + // Create another masking policy and bind it. + let mask_guard_ident = DataMaskNameIdent::new(tenant.clone(), mask_guard_name.to_string()); + mt.create_data_mask(CreateDatamaskReq { + create_option: CreateOption::Create, + name: mask_guard_ident.clone(), + data_mask_meta: DatamaskMeta { + args: vec![], + return_type: "".to_string(), + body: "".to_string(), + comment: None, + create_on: Utc::now(), + update_on: None, + }, + }) + .await?; + let mask_guard_id = get_kv_u64_data(mt, &mask_guard_ident).await?; + + table_info = util.get_table().await?; + let set_req = SetTableColumnMaskPolicyReq { + tenant: tenant.clone(), + seq: MatchSeq::Exact(table_info.ident.seq), + table_id, + action: SetSecurityPolicyAction::Set(mask_guard_id, vec![number_column_id]), + }; + mt.set_table_column_mask_policy(set_req).await?; + table_info = util.get_table().await?; + let policy_entry = table_info + .meta + .column_mask_policy_columns_ids + .get(&number_column_id) + .expect("mask policy should be attached"); + assert_eq!(mask_guard_id, policy_entry.policy_id); + + // Drop and immediately undrop the table; the binding should persist. + util.drop_table_by_id().await?; + mt.undrop_table(UndropTableReq { + name_ident: table_name_ident.clone(), + }) + .await?; + table_info = util.get_table().await?; + let policy_entry = table_info + .meta + .column_mask_policy_columns_ids + .get(&number_column_id) + .expect("mask policy should remain after undrop when policy still exists"); + assert_eq!(mask_guard_id, policy_entry.policy_id); + + // Dropping the policy should now fail because the table is active. + let err = mt.drop_data_mask(&mask_guard_ident).await?; + let err = err.unwrap_err(); + let err = ErrorCode::from(err); + assert_eq!(ErrorCode::ConstraintError("").code(), err.code()); + + let binding_ident = + MaskPolicyTableIdIdent::new_generic(tenant.clone(), MaskPolicyIdTableId { + policy_id: mask_guard_id, + table_id, + }); + assert!( + mt.get_pb(&binding_ident).await?.is_some(), + "binding should remain when policy drop is rejected" + ); + + Ok(()) + } + + #[fastrace::trace] + async fn row_access_policy_drop_with_table_lifecycle< + MT: SchemaApi + RowAccessPolicyApi + kvapi::KVApi, + >( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant_name = "tenant_row_policy_drop"; + let db_name = "db_row_policy_drop"; + let table_name = "tb_row_policy_drop"; + let policy_cleanup_name = "row_cleanup_after_drop"; + let policy_guard_name = "row_guard_while_active"; + + let tenant = Tenant::new_or_err(tenant_name, func_name!())?; + + // Prepare database and table. + let mut util = Util::new(mt, tenant_name, db_name, table_name, "FUSE"); + util.create_db().await?; + let (table_id, _) = util.create_table().await?; + + let table_name_ident = + TableNameIdent::new(tenant.clone(), db_name.to_string(), table_name.to_string()); + + let mut table_info = util.get_table().await?; + let column_id = table_info.meta.schema.fields[0].column_id; + + // Create a row access policy and bind it to the table. + let policy_cleanup_ident = + RowAccessPolicyNameIdent::new(tenant.clone(), policy_cleanup_name.to_string()); + mt.create_row_access_policy(CreateRowAccessPolicyReq { + can_replace: false, + name: policy_cleanup_ident.clone(), + row_access_policy_meta: RowAccessPolicyMeta { + args: vec![("number".to_string(), "UInt64".to_string())], + body: "true".to_string(), + comment: None, + create_on: Utc::now(), + update_on: None, + }, + }) + .await? + .unwrap(); + let cleanup_policy_id = { + let res = mt + .get_row_access_policy(&policy_cleanup_ident) + .await? + .expect("row access policy exists"); + *res.0.data + }; + + let set_req = SetTableRowAccessPolicyReq { + tenant: tenant.clone(), + table_id, + action: SetSecurityPolicyAction::Set(cleanup_policy_id, vec![column_id]), + }; + mt.set_table_row_access_policy(set_req).await??; + table_info = util.get_table().await?; + assert!( + table_info + .meta + .row_access_policy_columns_ids + .as_ref() + .is_some(), + "row access policy should be recorded in table meta" + ); + + // Drop the table (this deletes the table-policy reference), then drop the policy. + util.drop_table_by_id().await?; + + // Verify the reference was already deleted by drop_table + let binding_ident = + RowAccessPolicyTableIdIdent::new_generic(tenant.clone(), RowAccessPolicyIdTableId { + policy_id: cleanup_policy_id, + table_id, + }); + assert!( + mt.get_pb(&binding_ident).await?.is_none(), + "table-policy reference should be removed when dropping table" + ); + + // Now drop_policy should succeed since there are no active references + let dropped = mt.drop_row_access_policy(&policy_cleanup_ident).await??; + assert!(dropped.is_some()); + assert!( + mt.get_row_access_policy(&policy_cleanup_ident) + .await? + .is_none(), + "policy metadata should be gone after drop" + ); + + // Undrop the table; row access policy entries should disappear. + mt.undrop_table(UndropTableReq { + name_ident: table_name_ident.clone(), + }) + .await?; + table_info = util.get_table().await?; + assert!( + table_info.meta.row_access_policy_columns_ids.is_none(), + "undropped table should no longer reference the dropped row access policy" + ); + + // Create another row access policy and bind it. + let policy_guard_ident = + RowAccessPolicyNameIdent::new(tenant.clone(), policy_guard_name.to_string()); + mt.create_row_access_policy(CreateRowAccessPolicyReq { + can_replace: false, + name: policy_guard_ident.clone(), + row_access_policy_meta: RowAccessPolicyMeta { + args: vec![("number".to_string(), "UInt64".to_string())], + body: "true".to_string(), + comment: None, + create_on: Utc::now(), + update_on: None, + }, + }) + .await? + .unwrap(); + let guard_policy_id = { + let res = mt + .get_row_access_policy(&policy_guard_ident) + .await? + .expect("row access policy exists"); + *res.0.data + }; + let set_req = SetTableRowAccessPolicyReq { + tenant: tenant.clone(), + table_id, + action: SetSecurityPolicyAction::Set(guard_policy_id, vec![column_id]), + }; + mt.set_table_row_access_policy(set_req).await??; + table_info = util.get_table().await?; + let policy_entry = table_info + .meta + .row_access_policy_columns_ids + .as_ref() + .expect("row access policy should be attached after setting"); + assert_eq!(guard_policy_id, policy_entry.policy_id); + + // Drop and immediately undrop the table; the binding should persist. + util.drop_table_by_id().await?; + mt.undrop_table(UndropTableReq { + name_ident: table_name_ident.clone(), + }) + .await?; + table_info = util.get_table().await?; + let policy_entry = table_info + .meta + .row_access_policy_columns_ids + .as_ref() + .expect("row access policy should remain after undrop when policy still exists"); + assert_eq!(guard_policy_id, policy_entry.policy_id); + + // Dropping the policy should now fail because the table is active. + let err = mt.drop_row_access_policy(&policy_guard_ident).await?; + let err = err.unwrap_err(); + let err = ErrorCode::from(err); + assert_eq!(ErrorCode::ConstraintError("").code(), err.code()); + + let binding_ident = + RowAccessPolicyTableIdIdent::new_generic(tenant.clone(), RowAccessPolicyIdTableId { + policy_id: guard_policy_id, + table_id, + }); + assert!( + mt.get_pb(&binding_ident).await?.is_some(), + "binding should remain when row access policy drop is rejected" + ); + + // Clean up by dropping the table and policy. + util.drop_table_by_id().await?; + let dropped = mt.drop_row_access_policy(&policy_guard_ident).await??; + assert!(dropped.is_some()); + assert!( + mt.get_pb(&binding_ident).await?.is_none(), + "binding should be removed after successful row access policy drop" + ); + + Ok(()) + } + #[fastrace::trace] async fn database_drop_out_of_retention_time_history< MT: SchemaApi + kvapi::KVApi, diff --git a/src/meta/api/src/txn_condition_util.rs b/src/meta/api/src/txn_condition_util.rs index a30060a8fe933..d324b21518119 100644 --- a/src/meta/api/src/txn_condition_util.rs +++ b/src/meta/api/src/txn_condition_util.rs @@ -15,6 +15,7 @@ //! Transaction condition utilities extracted from util.rs use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_types::txn_condition::Target; use databend_common_meta_types::ConditionResult; use databend_common_meta_types::TxnCondition; @@ -32,3 +33,15 @@ pub fn txn_cond_seq(key: &impl kvapi::Key, op: ConditionResult, seq: u64) -> Txn target: Some(Target::Seq(seq)), } } + +/// Build a TxnCondition that checks the number of keys with the given prefix. +pub fn txn_cond_eq_keys_with_prefix( + prefix: &DirName, + count: u64, +) -> TxnCondition { + TxnCondition { + key: prefix.dir_name_with_slash(), + expected: ConditionResult::Eq as i32, + target: Some(Target::KeysWithPrefix(count)), + } +} diff --git a/src/meta/app/src/data_mask/mask_policy_policy_table_id_ident.rs b/src/meta/app/src/data_mask/mask_policy_policy_table_id_ident.rs index add41297d6341..b926ecf3ac777 100644 --- a/src/meta/app/src/data_mask/mask_policy_policy_table_id_ident.rs +++ b/src/meta/app/src/data_mask/mask_policy_policy_table_id_ident.rs @@ -20,7 +20,7 @@ use databend_common_meta_kvapi::kvapi::KeyParser; use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; -#[derive(PartialEq, Debug)] +#[derive(Clone, PartialEq, Debug)] pub struct MaskPolicyIdTableId { pub policy_id: u64, pub table_id: u64, @@ -42,9 +42,9 @@ impl KeyCodec for MaskPolicyIdTableId { } } -/// Mask Policy can be applied to tables, If drop a masking policy +/// Mask Policy can be applied to tables. When dropping a masking policy, /// should get all __fd_mask_policy_apply_table_id/tenant// -/// and remove table's bind. +/// and remove the table's reference. pub type MaskPolicyTableIdIdent = TIdent; pub type MaskPolicyTableIdIdentRaw = TIdentRaw; diff --git a/src/meta/app/src/row_access_policy/row_access_policy_table_id_ident.rs b/src/meta/app/src/row_access_policy/row_access_policy_table_id_ident.rs index d913f468d8c04..53d7268ff986d 100644 --- a/src/meta/app/src/row_access_policy/row_access_policy_table_id_ident.rs +++ b/src/meta/app/src/row_access_policy/row_access_policy_table_id_ident.rs @@ -20,7 +20,7 @@ use databend_common_meta_kvapi::kvapi::KeyParser; use crate::tenant_key::ident::TIdent; use crate::tenant_key::raw::TIdentRaw; -#[derive(PartialEq, Debug)] +#[derive(Clone, PartialEq, Debug)] pub struct RowAccessPolicyIdTableId { pub policy_id: u64, pub table_id: u64, @@ -42,9 +42,9 @@ impl KeyCodec for RowAccessPolicyIdTableId { } } -/// RowAccess Policy can be applied to tables, If drop a row access policy +/// RowAccess Policy can be applied to tables. When dropping a row access policy, /// should get all __fd_row_access_policy_apply_table_id/tenant// -/// and remove table's bind. +/// and remove the table's reference. pub type RowAccessPolicyTableIdIdent = TIdent; pub type RowAccessPolicyTableIdIdentRaw = TIdentRaw; diff --git a/src/query/ee/src/data_mask/data_mask_handler.rs b/src/query/ee/src/data_mask/data_mask_handler.rs index a4643ceb9652f..417f2be71a01b 100644 --- a/src/query/ee/src/data_mask/data_mask_handler.rs +++ b/src/query/ee/src/data_mask/data_mask_handler.rs @@ -43,7 +43,7 @@ impl DatamaskHandler for RealDatamaskHandler { } async fn drop_data_mask(&self, meta_api: Arc, req: DropDatamaskReq) -> Result<()> { - let dropped = meta_api.drop_data_mask(&req.name).await?; + let dropped = meta_api.drop_data_mask(&req.name).await??; if dropped.is_none() { if req.if_exists { // Ok diff --git a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs index df32f27885a50..fc44997895d85 100644 --- a/src/query/ee/src/row_access_policy/row_access_policy_handler.rs +++ b/src/query/ee/src/row_access_policy/row_access_policy_handler.rs @@ -37,7 +37,7 @@ pub struct RealRowAccessPolicyHandler {} #[async_trait::async_trait] impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { - async fn create_row_access( + async fn create_row_access_policy( &self, meta_api: Arc, req: CreateRowAccessPolicyReq, @@ -45,15 +45,15 @@ impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { std::result::Result>, MetaTxnError, > { - meta_api.create_row_access(req).await + meta_api.create_row_access_policy(req).await } - async fn drop_row_access( + async fn drop_row_access_policy( &self, meta_api: Arc, req: DropRowAccessPolicyReq, ) -> Result<()> { - let dropped = meta_api.drop_row_access(&req.name).await?; + let dropped = meta_api.drop_row_access_policy(&req.name).await??; if dropped.is_none() { return Err(AppError::from(req.name.unknown_error("drop row policy")).into()); } @@ -61,7 +61,7 @@ impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { Ok(()) } - async fn get_row_access( + async fn get_row_access_policy( &self, meta_api: Arc, tenant: &Tenant, @@ -69,20 +69,20 @@ impl RowAccessPolicyHandler for RealRowAccessPolicyHandler { ) -> Result<(SeqV, SeqV)> { let name_ident = RowAccessPolicyNameIdent::new(tenant, name); let res = meta_api - .get_row_access(&name_ident) + .get_row_access_policy(&name_ident) .await? .ok_or_else(|| AppError::from(name_ident.unknown_error("get row policy")))?; Ok(res) } - async fn get_row_access_by_id( + async fn get_row_access_policy_by_id( &self, meta_api: Arc, tenant: &Tenant, policy_id: u64, ) -> Result> { let res = meta_api - .get_row_access_by_id(tenant, policy_id) + .get_row_access_policy_by_id(tenant, policy_id) .await? .ok_or_else(|| { databend_common_exception::ErrorCode::UnknownRowAccessPolicy(format!( diff --git a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs index 6b1f0d71d73a1..f6c29d2e8a34a 100644 --- a/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs +++ b/src/query/ee_features/row_access_policy/src/row_access_policy_handler.rs @@ -44,7 +44,7 @@ use databend_common_meta_types::SeqV; #[async_trait::async_trait] pub trait RowAccessPolicyHandler: Sync + Send { - async fn create_row_access( + async fn create_row_access_policy( &self, meta_api: Arc, req: CreateRowAccessPolicyReq, @@ -53,20 +53,20 @@ pub trait RowAccessPolicyHandler: Sync + Send { MetaTxnError, >; - async fn drop_row_access( + async fn drop_row_access_policy( &self, meta_api: Arc, req: DropRowAccessPolicyReq, ) -> Result<()>; - async fn get_row_access( + async fn get_row_access_policy( &self, meta_api: Arc, tenant: &Tenant, name: String, ) -> Result<(SeqV, SeqV)>; - async fn get_row_access_by_id( + async fn get_row_access_policy_by_id( &self, meta_api: Arc, tenant: &Tenant, @@ -83,7 +83,7 @@ impl RowAccessPolicyHandlerWrapper { Self { handler } } - pub async fn create_row_access( + pub async fn create_row_access_policy( &self, meta_api: Arc, req: CreateRowAccessPolicyReq, @@ -91,34 +91,36 @@ impl RowAccessPolicyHandlerWrapper { std::result::Result>, MetaTxnError, > { - self.handler.create_row_access(meta_api, req).await + self.handler.create_row_access_policy(meta_api, req).await } - pub async fn drop_row_access( + pub async fn drop_row_access_policy( &self, meta_api: Arc, req: DropRowAccessPolicyReq, ) -> Result<()> { - self.handler.drop_row_access(meta_api, req).await + self.handler.drop_row_access_policy(meta_api, req).await } - pub async fn get_row_access( + pub async fn get_row_access_policy( &self, meta_api: Arc, tenant: &Tenant, name: String, ) -> Result<(SeqV, SeqV)> { - self.handler.get_row_access(meta_api, tenant, name).await + self.handler + .get_row_access_policy(meta_api, tenant, name) + .await } - pub async fn get_row_access_by_id( + pub async fn get_row_access_policy_by_id( &self, meta_api: Arc, tenant: &Tenant, policy_id: u64, ) -> Result> { self.handler - .get_row_access_by_id(meta_api, tenant, policy_id) + .get_row_access_policy_by_id(meta_api, tenant, policy_id) .await } } diff --git a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs index ab3754a897abd..f02fece87c7d1 100644 --- a/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs +++ b/src/query/service/src/interpreters/interpreter_row_access_policy_create.rs @@ -56,7 +56,7 @@ impl Interpreter for CreateRowAccessPolicyInterpreter { let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_row_access_policy_handler(); if let Err(_e) = handler - .create_row_access(meta_api, self.plan.clone().into()) + .create_row_access_policy(meta_api, self.plan.clone().into()) .await? { return if let CreateOption::CreateIfNotExists = self.plan.create_option { diff --git a/src/query/service/src/interpreters/interpreter_row_access_policy_desc.rs b/src/query/service/src/interpreters/interpreter_row_access_policy_desc.rs index fb9971ac3c24a..eeadf3fe747f4 100644 --- a/src/query/service/src/interpreters/interpreter_row_access_policy_desc.rs +++ b/src/query/service/src/interpreters/interpreter_row_access_policy_desc.rs @@ -59,7 +59,7 @@ impl Interpreter for DescRowAccessPolicyInterpreter { let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_row_access_policy_handler(); let policy = handler - .get_row_access(meta_api, &self.ctx.get_tenant(), self.plan.name.clone()) + .get_row_access_policy(meta_api, &self.ctx.get_tenant(), self.plan.name.clone()) .await; let policy = match policy { diff --git a/src/query/service/src/interpreters/interpreter_row_access_policy_drop.rs b/src/query/service/src/interpreters/interpreter_row_access_policy_drop.rs index a00b9e7240811..9a0b15fa39a22 100644 --- a/src/query/service/src/interpreters/interpreter_row_access_policy_drop.rs +++ b/src/query/service/src/interpreters/interpreter_row_access_policy_drop.rs @@ -55,7 +55,7 @@ impl Interpreter for DropRowAccessPolicyInterpreter { let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_row_access_policy_handler(); if let Err(e) = handler - .drop_row_access(meta_api, self.plan.clone().into()) + .drop_row_access_policy(meta_api, self.plan.clone().into()) .await { if e.code() == ErrorCode::UNKNOWN_ROW_ACCESS_POLICY && self.plan.if_exists { diff --git a/src/query/service/src/interpreters/interpreter_table_row_access_add.rs b/src/query/service/src/interpreters/interpreter_table_row_access_add.rs index 444ecae8f943b..b69a80c3b5cd5 100644 --- a/src/query/service/src/interpreters/interpreter_table_row_access_add.rs +++ b/src/query/service/src/interpreters/interpreter_table_row_access_add.rs @@ -96,7 +96,7 @@ impl Interpreter for AddTableRowAccessPolicyInterpreter { let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_row_access_policy_handler(); let (policy_id, policy) = handler - .get_row_access(meta_api, &self.ctx.get_tenant(), policy_name.clone()) + .get_row_access_policy(meta_api, &self.ctx.get_tenant(), policy_name.clone()) .await?; // check if column type match to the input type diff --git a/src/query/service/src/interpreters/interpreter_table_row_access_drop.rs b/src/query/service/src/interpreters/interpreter_table_row_access_drop.rs index dc5bb19229427..03f63a142a33e 100644 --- a/src/query/service/src/interpreters/interpreter_table_row_access_drop.rs +++ b/src/query/service/src/interpreters/interpreter_table_row_access_drop.rs @@ -67,7 +67,7 @@ impl Interpreter for DropTableRowAccessPolicyInterpreter { let meta_api = UserApiProvider::instance().get_meta_store_client(); let handler = get_row_access_policy_handler(); let (policy_id, _) = handler - .get_row_access( + .get_row_access_policy( meta_api, &self.ctx.get_tenant(), self.plan.policy.to_string(), diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index b7a81403ccb00..519b29c556278 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -514,7 +514,7 @@ impl Binder { }) .collect(); let policy = policy.policy_id; - let res = databend_common_base::runtime::block_on(handler.get_row_access_by_id( + let res = databend_common_base::runtime::block_on(handler.get_row_access_policy_by_id( meta_api, &self.ctx.get_tenant(), policy, diff --git a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test index dcabe7b62dc99..8ac31ddf80b22 100644 --- a/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test +++ b/tests/sqllogictests/suites/ee/05_ee_ddl/05_0004_ddl_security_policy.test @@ -19,10 +19,10 @@ statement ok set global enable_planner_cache = 0; statement ok -drop row access policy if exists rap_multi_case; +drop table if exists rap_multi_test; statement ok -drop table if exists rap_multi_test; +drop row access policy if exists rap_multi_case; statement ok CREATE ROW ACCESS POLICY rap_multi_case AS (UserId string, Department string) RETURNS boolean -> @@ -48,12 +48,38 @@ select * from rap_multi_test order by id; 1 alice Engineering 3 charlie Engineering +statement ok +drop table rap_multi_test; + +statement ok +undrop table rap_multi_test; + +## Should only see Engineering department rows +query ITT +select * from rap_multi_test order by id; +---- +1 alice Engineering +3 charlie Engineering + +statement ok +drop table rap_multi_test; + statement ok drop table if exists rap_case_test; statement ok drop row access policy if exists rap_multi_case; +statement ok +undrop table rap_multi_test; + +query ITT +select * from rap_multi_test order by id; +---- +1 alice Engineering +2 bob Sales +3 charlie Engineering + statement ok drop row access policy if exists rap1; @@ -148,7 +174,10 @@ alter table t_1 drop column id; statement ok alter table t ADD ROW ACCESS POLICY rap_it ON (empl_id); -## t apply row access policy rpa_it(means if current role == account_admin will return false) +## t apply row access policy rpa_it +statement error 1133 +drop row access policy rap_it; + query I select id from t; ---- @@ -220,10 +249,10 @@ statement ok drop row access policy if exists p1; statement ok -drop row access policy if exists rap_it1; +drop table if exists t_1; statement ok -drop table if exists t_1; +drop row access policy if exists rap_it1; statement ok drop MASKING POLICY if exists mask @@ -304,6 +333,9 @@ CREATE MASKING POLICY maskc AS (val int) RETURNS int -> CASE WHEN current_role() statement ok alter table data_mask_test modify column b set masking policy maskb; +statement error 1133 +drop masking policy maskb; + statement error 1132 alter table data_mask_test modify column b string not null; @@ -955,6 +987,106 @@ drop masking policy if exists mask_mixedcase; statement ok drop masking policy if exists mask_multi_case; +statement ok +drop masking policy if exists mask_ssn_conditional; + +statement ok +drop table if exists employees; + +statement ok +CREATE MASKING POLICY mask_ssn_conditional AS (val STRING, user_role STRING) RETURNS STRING -> + CASE + WHEN user_role = current_role() THEN val + ELSE '***-**-****' + END +COMMENT = 'Mask SSN conditionally based on role'; + +statement ok +create table employees( + id int, + name string, + ssn string, + salary int, + department string, + role string +); + +statement ok +insert into employees values + (1, 'Alice', '123-45-6789', 100000, 'Engineering', 'hr_admin'), + (2, 'Bob', '987-65-4321', 80000, 'Sales', 'regular_user'), + (3, 'Charlie', '456-78-9012', 120000, 'Engineering', 'regular_user'); + +statement ok +ALTER TABLE employees modify column ssn SET MASKING POLICY mask_ssn_conditional using (ssn, role); + +query ITTFT +select * from employees; +---- +1 Alice ***-**-**** 100000 Engineering hr_admin +2 Bob ***-**-**** 80000 Sales regular_user +3 Charlie ***-**-**** 120000 Engineering regular_user + +statement ok +DROP TABLE employees; + +statement ok +DROP MASKING POLICY mask_ssn_conditional; + +## policy invalid +statement ok +UNDROP TABLE employees; + +query ITTFT +select * from employees; +---- +1 Alice 123-45-6789 100000 Engineering hr_admin +2 Bob 987-65-4321 80000 Sales regular_user +3 Charlie 456-78-9012 120000 Engineering regular_user + +statement ok +CREATE MASKING POLICY mask_ssn_conditional AS (val STRING, user_role STRING) RETURNS STRING -> + CASE + WHEN user_role = current_role() THEN val + ELSE '***-**-****' + END +COMMENT = 'Mask SSN conditionally based on role'; + +statement ok +ALTER TABLE employees modify column ssn SET MASKING POLICY mask_ssn_conditional using (ssn, role); + +## policy valid +query ITTFT +select * from employees; +---- +1 Alice ***-**-**** 100000 Engineering hr_admin +2 Bob ***-**-**** 80000 Sales regular_user +3 Charlie ***-**-**** 120000 Engineering regular_user + +statement ok +DROP TABLE employees; + +statement ok +UNDROP TABLE employees; + +## policy valid +query ITTFT +select * from employees; +---- +1 Alice ***-**-**** 100000 Engineering hr_admin +2 Bob ***-**-**** 80000 Sales regular_user +3 Charlie ***-**-**** 120000 Engineering regular_user + +## -- should failed(table be used) +statement error 1133 +DROP MASKING POLICY mask_ssn_conditional; + +statement ok +DROP TABLE employees; + +statement ok +DROP MASKING POLICY mask_ssn_conditional; + statement ok unset global enable_planner_cache;