Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/meta/api/src/data_mask_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;

use crate::errors::MaskingPolicyError;
use crate::kv_app_error::KVAppError;
use crate::meta_txn_error::MetaTxnError;

#[async_trait::async_trait]
pub trait DatamaskApi: Send + Sync {
Expand All @@ -35,7 +37,10 @@ pub trait DatamaskApi: Send + Sync {
async fn drop_data_mask(
&self,
name_ident: &DataMaskNameIdent,
) -> Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, KVAppError>;
) -> Result<
Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, MaskingPolicyError>,
MetaTxnError,
>;

async fn get_data_mask(
&self,
Expand Down
55 changes: 45 additions & 10 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,30 @@ use databend_common_meta_app::data_mask::DataMaskId;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
use databend_common_meta_app::data_mask::MaskPolicyIdTableId;
use databend_common_meta_app::data_mask::MaskPolicyTableIdIdent;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;

use crate::data_mask_api::DatamaskApi;
use crate::errors::MaskingPolicyError;
use crate::fetch_id;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::meta_txn_error::MetaTxnError;
use crate::txn_backoff::txn_backoff;
use crate::txn_condition_util::txn_cond_eq_keys_with_prefix;
use crate::txn_condition_util::txn_cond_eq_seq;
use crate::txn_core_util::send_txn;
use crate::txn_core_util::txn_delete_exact;
Expand Down Expand Up @@ -135,33 +141,62 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
async fn drop_data_mask(
&self,
name_ident: &DataMaskNameIdent,
) -> Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, KVAppError> {
) -> Result<
Result<Option<(SeqV<DataMaskId>, SeqV<DatamaskMeta>)>, MaskingPolicyError>,
MetaTxnError,
> {
debug!(name_ident :? =(name_ident); "DatamaskApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;
let mut txn = TxnRequest::default();

// Check if policy exists
let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());

let Some((seq_id, seq_meta)) = res else {
return Ok(None);
return Ok(Ok(None));
};

let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
let policy_id = *seq_id.data;
let tenant = name_ident.tenant();
let table_policy_ref_prefix = DirName::new(MaskPolicyTableIdIdent::new_generic(
tenant.clone(),
MaskPolicyIdTableId {
policy_id,
table_id: 0,
},
));

// List all table-policy references
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;

// Policy is in use - cannot drop
if !table_policy_refs.is_empty() {
return Ok(Err(MaskingPolicyError::policy_in_use(
tenant.tenant_name().to_string(),
name_ident.data_mask_name().to_string(),
)));
}

// No references - drop the policy
let id_ident = seq_id.data.into_t_ident(tenant);
let mut txn = TxnRequest::default();

// Ensure no new references were created
txn.condition
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));

txn_delete_exact(&mut txn, name_ident, seq_id.seq);
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later

// TODO: Tentative retention for compatibility. Can be deleted later.
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
let (succ, _responses) = send_txn(self, txn).await?;
debug!(succ = succ;"{}", func_name!());

let (succ, _responses) = send_txn(self, txn).await?;
if succ {
return Ok(Some((seq_id, seq_meta)));
return Ok(Ok(Some((seq_id, seq_meta))));
}
// Transaction failed, retry
}
}

Expand Down
51 changes: 50 additions & 1 deletion src/meta/api/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use databend_common_exception::ErrorCode;
use databend_common_meta_app::principal::AutoIncrementKey;

/// Table logic error, unrelated to the backend service providing Table management, or dependent component.
#[derive(Clone, Debug, thiserror::Error)]
pub enum TableError {
Expand All @@ -39,6 +38,56 @@ impl From<TableError> for ErrorCode {
}
}

#[derive(Clone, Debug, thiserror::Error)]
pub enum MaskingPolicyError {
#[error(
"MASKING POLICY `{policy_name}` is still in use. Unset it from all tables before dropping."
)]
PolicyInUse { policy_name: String },
}

impl MaskingPolicyError {
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
Self::PolicyInUse {
policy_name: policy_name.into(),
}
}
}

impl From<MaskingPolicyError> for ErrorCode {
fn from(value: MaskingPolicyError) -> Self {
let s = value.to_string();
match value {
MaskingPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s),
}
}
}

#[derive(Clone, Debug, thiserror::Error)]
pub enum RowAccessPolicyError {
#[error(
"ROW ACCESS POLICY `{policy_name}` is still in use. Unset it from all tables before dropping."
)]
PolicyInUse { policy_name: String },
}

impl RowAccessPolicyError {
pub fn policy_in_use(_tenant: impl Into<String>, policy_name: impl Into<String>) -> Self {
Self::PolicyInUse {
policy_name: policy_name.into(),
}
}
}

impl From<RowAccessPolicyError> for ErrorCode {
fn from(value: RowAccessPolicyError) -> Self {
let s = value.to_string();
match value {
RowAccessPolicyError::PolicyInUse { .. } => ErrorCode::ConstraintError(s),
}
}
}

#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum AutoIncrementError {
#[error("OutOfAutoIncrementRange: `{key}` while `{context}`")]
Expand Down
2 changes: 2 additions & 0 deletions src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub use error_util::db_has_to_not_exist;
pub use error_util::db_id_has_to_exist;
pub use error_util::table_has_to_not_exist;
pub use error_util::unknown_database_error;
pub use errors::MaskingPolicyError;
pub use errors::RowAccessPolicyError;
pub use garbage_collection_api::GarbageCollectionApi;
pub use index_api::IndexApi;
// Re-export from new kv_fetch_util module for backward compatibility
Expand Down
14 changes: 9 additions & 5 deletions src/meta/api/src/row_access_policy_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ use databend_common_meta_app::tenant_key::errors::ExistError;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;

use crate::errors::RowAccessPolicyError;
use crate::meta_txn_error::MetaTxnError;

#[async_trait::async_trait]
pub trait RowAccessPolicyApi: Send + Sync {
async fn create_row_access(
async fn create_row_access_policy(
&self,
req: CreateRowAccessPolicyReq,
) -> Result<
Expand All @@ -37,17 +38,20 @@ pub trait RowAccessPolicyApi: Send + Sync {

/// On success, returns the dropped id and row policy.
/// Returning None, means nothing is removed.
async fn drop_row_access(
async fn drop_row_access_policy(
&self,
name_ident: &RowAccessPolicyNameIdent,
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError>;
) -> Result<
Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, RowAccessPolicyError>,
MetaTxnError,
>;

async fn get_row_access(
async fn get_row_access_policy(
&self,
name_ident: &RowAccessPolicyNameIdent,
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaError>;

async fn get_row_access_by_id(
async fn get_row_access_policy_by_id(
&self,
tenant: &Tenant,
policy_id: u64,
Expand Down
61 changes: 45 additions & 16 deletions src/meta/api/src/row_access_policy_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@

use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::row_access_policy::row_access_policy_name_ident;
use databend_common_meta_app::row_access_policy::row_access_policy_table_id_ident::RowAccessPolicyIdTableId;
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReply;
use databend_common_meta_app::row_access_policy::CreateRowAccessPolicyReq;
use databend_common_meta_app::row_access_policy::RowAccessPolicyId;
use databend_common_meta_app::row_access_policy::RowAccessPolicyIdIdent;
use databend_common_meta_app::row_access_policy::RowAccessPolicyMeta;
use databend_common_meta_app::row_access_policy::RowAccessPolicyNameIdent;
use databend_common_meta_app::row_access_policy::RowAccessPolicyTableIdIdent;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_app::tenant_key::errors::ExistError;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;

use crate::errors::RowAccessPolicyError;
use crate::fetch_id;
use crate::kv_pb_api::KVPbApi;
use crate::meta_txn_error::MetaTxnError;
use crate::row_access_policy_api::RowAccessPolicyApi;
use crate::txn_backoff::txn_backoff;
use crate::txn_condition_util::txn_cond_eq_keys_with_prefix;
use crate::txn_condition_util::txn_cond_eq_seq;
use crate::txn_core_util::send_txn;
use crate::txn_core_util::txn_delete_exact;
Expand All @@ -44,7 +49,7 @@ use crate::txn_op_builder_util::txn_op_put_pb;
/// Thus every type that impl kvapi::KVApi impls RowAccessPolicyApi.
#[tonic::async_trait]
impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
async fn create_row_access(
async fn create_row_access_policy(
&self,
req: CreateRowAccessPolicyReq,
) -> Result<
Expand Down Expand Up @@ -120,42 +125,66 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
Ok(Ok(CreateRowAccessPolicyReply { id: *id }))
}

async fn drop_row_access(
async fn drop_row_access_policy(
&self,
name_ident: &RowAccessPolicyNameIdent,
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaTxnError> {
) -> Result<
Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, RowAccessPolicyError>,
MetaTxnError,
> {
debug!(name_ident :? =(name_ident); "RowAccessPolicyApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

// Check if policy exists
let res = self.get_id_and_value(name_ident).await?;
debug!(res :? = res, name_key :? =(name_ident); "{}", func_name!());

let Some((seq_id, seq_meta)) = res else {
return Ok(None);
return Ok(Ok(None));
};

let id_ident = seq_id.data.into_t_ident(name_ident.tenant());
let policy_id = *seq_id.data;
let tenant = name_ident.tenant();
let table_policy_ref_prefix = DirName::new(RowAccessPolicyTableIdIdent::new_generic(
tenant.clone(),
RowAccessPolicyIdTableId {
policy_id,
table_id: 0,
},
));

// List all table-policy references
let table_policy_refs = self.list_pb_vec(&table_policy_ref_prefix).await?;

// Policy is in use - cannot drop
if !table_policy_refs.is_empty() {
return Ok(Err(RowAccessPolicyError::policy_in_use(
tenant.tenant_name().to_string(),
name_ident.row_access_name().to_string(),
)));
}

// No references - drop the policy
let id_ident = seq_id.data.into_t_ident(tenant);
let mut txn = TxnRequest::default();

// Ensure no new references were created
txn.condition
.push(txn_cond_eq_keys_with_prefix(&table_policy_ref_prefix, 0));

txn_delete_exact(&mut txn, name_ident, seq_id.seq);
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);

// TODO(eason): need to remove row policy from table meta

let (succ, _responses) = send_txn(self, txn).await?;
debug!(succ = succ;"{}", func_name!());

if succ {
return Ok(Some((seq_id, seq_meta)));
return Ok(Ok(Some((seq_id, seq_meta))));
}
// Transaction failed, retry
}
}

async fn get_row_access(
async fn get_row_access_policy(
&self,
name_ident: &RowAccessPolicyNameIdent,
) -> Result<Option<(SeqV<RowAccessPolicyId>, SeqV<RowAccessPolicyMeta>)>, MetaError> {
Expand All @@ -166,7 +195,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> RowAccessPolicyApi for KV {
Ok(res)
}

async fn get_row_access_by_id(
async fn get_row_access_policy_by_id(
&self,
tenant: &Tenant,
policy_id: u64,
Expand Down
Loading