@@ -19,20 +19,28 @@ use databend_common_meta_app::data_mask::DataMaskId;
1919use databend_common_meta_app:: data_mask:: DataMaskIdIdent ;
2020use databend_common_meta_app:: data_mask:: DataMaskNameIdent ;
2121use databend_common_meta_app:: data_mask:: DatamaskMeta ;
22+ use databend_common_meta_app:: data_mask:: MaskPolicyIdTableId ;
23+ use databend_common_meta_app:: data_mask:: MaskPolicyTableIdIdent ;
2224use databend_common_meta_app:: data_mask:: MaskPolicyTableIdListIdent ;
2325use databend_common_meta_app:: data_mask:: MaskpolicyTableIdList ;
2426use databend_common_meta_app:: id_generator:: IdGenerator ;
2527use databend_common_meta_app:: schema:: CreateOption ;
28+ use databend_common_meta_app:: schema:: TableId ;
29+ use databend_common_meta_app:: schema:: TableMeta ;
2630use databend_common_meta_app:: tenant:: Tenant ;
2731use databend_common_meta_app:: KeyWithTenant ;
2832use databend_common_meta_kvapi:: kvapi;
33+ use databend_common_meta_kvapi:: kvapi:: DirName ;
2934use databend_common_meta_types:: MetaError ;
3035use databend_common_meta_types:: SeqV ;
3136use databend_common_meta_types:: TxnRequest ;
3237use fastrace:: func_name;
3338use log:: debug;
3439
3540use crate :: data_mask_api:: DatamaskApi ;
41+ use crate :: errors:: MaskingPolicyError ;
42+ use crate :: errors:: SecurityPolicyError ;
43+ use crate :: errors:: SecurityPolicyKind ;
3644use crate :: fetch_id;
3745use crate :: kv_app_error:: KVAppError ;
3846use crate :: kv_pb_api:: KVPbApi ;
@@ -135,12 +143,16 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
135143 async fn drop_data_mask (
136144 & self ,
137145 name_ident : & DataMaskNameIdent ,
138- ) -> Result < Option < ( SeqV < DataMaskId > , SeqV < DatamaskMeta > ) > , KVAppError > {
146+ ) -> Result < Option < ( SeqV < DataMaskId > , SeqV < DatamaskMeta > ) > , MaskingPolicyError > {
139147 debug ! ( name_ident : ? =( name_ident) ; "DatamaskApi: {}" , func_name!( ) ) ;
140148
141149 let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
142150 loop {
143- trials. next ( ) . unwrap ( ) ?. await ;
151+ trials
152+ . next ( )
153+ . unwrap ( )
154+ . map_err ( MaskingPolicyError :: from) ?
155+ . await ;
144156 let mut txn = TxnRequest :: default ( ) ;
145157
146158 let res = self . get_id_and_value ( name_ident) . await ?;
@@ -150,10 +162,33 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
150162 return Ok ( None ) ;
151163 } ;
152164
165+ let policy_id = * seq_id. data ;
166+ let usage = collect_mask_policy_usage ( self , name_ident. tenant ( ) , policy_id) . await ?;
167+ if !usage. active_tables . is_empty ( ) {
168+ let tenant = name_ident. tenant ( ) . tenant_name ( ) . to_string ( ) ;
169+ let policy_name = name_ident. data_mask_name ( ) . to_string ( ) ;
170+ let err = SecurityPolicyError :: policy_in_use (
171+ tenant,
172+ SecurityPolicyKind :: Masking ,
173+ policy_name,
174+ ) ;
175+ return Err ( err. into ( ) ) ;
176+ }
177+
153178 let id_ident = seq_id. data . into_t_ident ( name_ident. tenant ( ) ) ;
154179
155180 txn_delete_exact ( & mut txn, name_ident, seq_id. seq ) ;
156181 txn_delete_exact ( & mut txn, & id_ident, seq_meta. seq ) ;
182+ for binding in & usage. stale_bindings {
183+ txn_delete_exact ( & mut txn, & binding. ident , binding. seq ) ;
184+ }
185+ for update in & usage. table_updates {
186+ txn. condition
187+ . push ( txn_cond_eq_seq ( & update. table_id , update. seq ) ) ;
188+ let op = txn_op_put_pb ( & update. table_id , & update. meta , None )
189+ . map_err ( |e| MaskingPolicyError :: from ( MetaError :: from ( e) ) ) ?;
190+ txn. if_then . push ( op) ;
191+ }
157192 // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
158193 clear_table_column_mask_policy ( self , name_ident, & mut txn) . await ?;
159194 let ( succ, _responses) = send_txn ( self , txn) . await ?;
@@ -195,7 +230,7 @@ async fn clear_table_column_mask_policy(
195230 kv_api : & ( impl kvapi:: KVApi < Error = MetaError > + ?Sized ) ,
196231 name_ident : & DataMaskNameIdent ,
197232 txn : & mut TxnRequest ,
198- ) -> Result < ( ) , MetaError > {
233+ ) -> Result < ( ) , MaskingPolicyError > {
199234 let id_list_key = MaskPolicyTableIdListIdent :: new_from ( name_ident. clone ( ) ) ;
200235
201236 let seq_id_list = kv_api. get_pb ( & id_list_key) . await ?;
@@ -207,3 +242,95 @@ async fn clear_table_column_mask_policy(
207242 txn_delete_exact ( txn, & id_list_key, seq_id_list. seq ) ;
208243 Ok ( ( ) )
209244}
245+
246+ #[ derive( Default ) ]
247+ struct MaskPolicyUsage {
248+ active_tables : Vec < u64 > ,
249+ stale_bindings : Vec < BindingEntry > ,
250+ table_updates : Vec < TableMetaUpdate > ,
251+ }
252+
253+ struct BindingEntry {
254+ ident : MaskPolicyTableIdIdent ,
255+ seq : u64 ,
256+ }
257+
258+ struct TableMetaUpdate {
259+ table_id : TableId ,
260+ seq : u64 ,
261+ meta : TableMeta ,
262+ }
263+
264+ fn strip_mask_policy_from_table_meta ( table_meta : & mut TableMeta , policy_id : u64 ) -> bool {
265+ let mut removed = false ;
266+ table_meta
267+ . column_mask_policy_columns_ids
268+ . retain ( |_, policy| {
269+ let keep = policy. policy_id != policy_id;
270+ if !keep {
271+ removed = true ;
272+ }
273+ keep
274+ } ) ;
275+
276+ if removed {
277+ table_meta. column_mask_policy = None ;
278+ }
279+
280+ removed
281+ }
282+
283+ async fn collect_mask_policy_usage (
284+ kv_api : & ( impl kvapi:: KVApi < Error = MetaError > + ?Sized ) ,
285+ tenant : & Tenant ,
286+ policy_id : u64 ,
287+ ) -> Result < MaskPolicyUsage , MaskingPolicyError > {
288+ let binding_prefix = DirName :: new ( MaskPolicyTableIdIdent :: new_generic (
289+ tenant. clone ( ) ,
290+ MaskPolicyIdTableId {
291+ policy_id,
292+ table_id : 0 ,
293+ } ,
294+ ) ) ;
295+ let bindings = kv_api
296+ . list_pb_vec ( & binding_prefix)
297+ . await
298+ . map_err ( MaskingPolicyError :: from) ?;
299+
300+ let mut usage = MaskPolicyUsage :: default ( ) ;
301+ for ( binding_ident, seqv) in bindings {
302+ let table_id = binding_ident. name ( ) . table_id ;
303+ let table_key = TableId :: new ( table_id) ;
304+ match kv_api
305+ . get_pb ( & table_key)
306+ . await
307+ . map_err ( MaskingPolicyError :: from) ?
308+ {
309+ Some ( mut table_meta_seqv) => {
310+ if table_meta_seqv. data . drop_on . is_none ( ) {
311+ usage. active_tables . push ( table_id) ;
312+ } else {
313+ if strip_mask_policy_from_table_meta ( & mut table_meta_seqv. data , policy_id) {
314+ usage. table_updates . push ( TableMetaUpdate {
315+ table_id : table_key,
316+ seq : table_meta_seqv. seq ,
317+ meta : table_meta_seqv. data . clone ( ) ,
318+ } ) ;
319+ }
320+ usage. stale_bindings . push ( BindingEntry {
321+ ident : binding_ident,
322+ seq : seqv. seq ,
323+ } ) ;
324+ }
325+ }
326+ None => {
327+ usage. stale_bindings . push ( BindingEntry {
328+ ident : binding_ident,
329+ seq : seqv. seq ,
330+ } ) ;
331+ }
332+ }
333+ }
334+
335+ Ok ( usage)
336+ }
0 commit comments