1313// limitations under the License.
1414
1515use databend_common_meta_app:: app_error:: AppError ;
16- use databend_common_meta_app:: app_error:: TxnRetryMaxTimes ;
1716use databend_common_meta_app:: data_mask:: CreateDatamaskReply ;
1817use databend_common_meta_app:: data_mask:: CreateDatamaskReq ;
1918use databend_common_meta_app:: data_mask:: DataMaskId ;
@@ -26,7 +25,6 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
2625use databend_common_meta_app:: data_mask:: MaskpolicyTableIdList ;
2726use databend_common_meta_app:: id_generator:: IdGenerator ;
2827use databend_common_meta_app:: schema:: CreateOption ;
29- use databend_common_meta_app:: schema:: TableMeta ;
3028use databend_common_meta_app:: tenant:: Tenant ;
3129use databend_common_meta_app:: KeyWithTenant ;
3230use databend_common_meta_kvapi:: kvapi;
@@ -43,10 +41,6 @@ use crate::fetch_id;
4341use crate :: kv_app_error:: KVAppError ;
4442use crate :: kv_pb_api:: KVPbApi ;
4543use crate :: meta_txn_error:: MetaTxnError ;
46- use crate :: security_policy_usage:: collect_policy_usage;
47- use crate :: security_policy_usage:: PolicyBinding ;
48- use crate :: security_policy_usage:: PolicyDropAction ;
49- use crate :: security_policy_usage:: PolicyUsage ;
5044use crate :: txn_backoff:: txn_backoff;
5145use crate :: txn_condition_util:: txn_cond_eq_keys_with_prefix;
5246use crate :: txn_condition_util:: txn_cond_eq_seq;
@@ -153,122 +147,99 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
153147 > {
154148 debug ! ( name_ident : ? =( name_ident) ; "DatamaskApi: {}" , func_name!( ) ) ;
155149
156- /// Maximum number of cleanup batch iterations to prevent infinite loops.
157- /// Each iteration cleans up to 100 stale bindings. With this limit, we can
158- /// handle up to 1000 stale bindings before giving up.
159- const MAX_CLEANUP_ITERATIONS : usize = 10 ;
150+ const BATCH_SIZE : usize = 100 ;
160151
161152 let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
162- let mut cleanup_iterations = 0 ;
163-
164153 loop {
165154 trials. next ( ) . unwrap ( ) ?. await ;
166155
156+ // Check if policy exists
167157 let res = self . get_id_and_value ( name_ident) . await ?;
168- debug ! ( res : ? = res, name_key : ? =( name_ident) ; "{}" , func_name!( ) ) ;
169-
170158 let Some ( ( seq_id, seq_meta) ) = res else {
171159 return Ok ( Ok ( None ) ) ;
172160 } ;
173161
174162 let policy_id = * seq_id. data ;
175- let usage: MaskPolicyUsage =
176- collect_policy_usage ( self , name_ident. tenant ( ) , policy_id) . await ?;
177- if !usage. active_tables . is_empty ( ) {
178- let tenant = name_ident. tenant ( ) . tenant_name ( ) . to_string ( ) ;
179- let policy_name = name_ident. data_mask_name ( ) . to_string ( ) ;
180- let err = MaskingPolicyError :: policy_in_use ( tenant, policy_name) ;
181- return Ok ( Err ( err) ) ;
163+ let tenant = name_ident. tenant ( ) ;
164+ let binding_prefix = DirName :: new ( MaskPolicyTableIdIdent :: new_generic (
165+ tenant. clone ( ) ,
166+ MaskPolicyIdTableId {
167+ policy_id,
168+ table_id : 0 ,
169+ } ,
170+ ) ) ;
171+
172+ // List all bindings and categorize them
173+ let bindings = self . list_pb_vec ( & binding_prefix) . await ?;
174+ let total_binding_count = bindings. len ( ) as u64 ;
175+
176+ let mut active_bindings = Vec :: new ( ) ;
177+ let mut stale_bindings = Vec :: new ( ) ;
178+
179+ for ( binding_key, binding_seqv) in bindings {
180+ let table_id = binding_key. name ( ) . table_id ;
181+ let table_key = databend_common_meta_app:: schema:: TableId :: new ( table_id) ;
182+
183+ match self . get_pb ( & table_key) . await ? {
184+ Some ( SeqV { data, .. } ) if data. drop_on . is_none ( ) => {
185+ // Active table using this policy
186+ active_bindings. push ( table_id) ;
187+ }
188+ _ => {
189+ // Table dropped or doesn't exist - stale binding
190+ stale_bindings. push ( ( binding_key, binding_seqv. seq ) ) ;
191+ }
192+ }
182193 }
183194
184- let id_ident = seq_id. data . into_t_ident ( name_ident. tenant ( ) ) ;
185- let action = usage. prepare_drop_action ( name_ident. tenant ( ) , policy_id) ;
186-
187- match action {
188- PolicyDropAction :: FinalDrop {
189- prefix,
190- binding_count,
191- bindings,
192- table_updates,
193- } => {
194- let mut txn = TxnRequest :: default ( ) ;
195- // Ensure no new bindings were created concurrently
196- txn. condition
197- . push ( txn_cond_eq_keys_with_prefix ( & prefix, binding_count) ) ;
198-
199- txn_delete_exact ( & mut txn, name_ident, seq_id. seq ) ;
200- txn_delete_exact ( & mut txn, & id_ident, seq_meta. seq ) ;
201-
202- for ( key, seq) in bindings {
203- txn_delete_exact ( & mut txn, & key, seq) ;
204- }
195+ // Policy is in use - cannot drop
196+ if !active_bindings. is_empty ( ) {
197+ return Ok ( Err ( MaskingPolicyError :: policy_in_use (
198+ tenant. tenant_name ( ) . to_string ( ) ,
199+ name_ident. data_mask_name ( ) . to_string ( ) ,
200+ ) ) ) ;
201+ }
205202
206- // Clean up stale policy references in dropped table metadata
207- for update in table_updates {
208- txn. condition
209- . push ( txn_cond_eq_seq ( & update. table_id , update. seq ) ) ;
210- let op = txn_op_put_pb ( & update. table_id , & update. meta , None ) ?;
211- txn. if_then . push ( op) ;
212- }
203+ // Still have stale bindings - clean up a batch
204+ if !stale_bindings. is_empty ( ) {
205+ let batch_size = stale_bindings. len ( ) . min ( BATCH_SIZE ) ;
206+ let batch: Vec < _ > = stale_bindings. into_iter ( ) . take ( batch_size) . collect ( ) ;
213207
214- // TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
215- clear_table_column_mask_policy ( self , name_ident, & mut txn) . await ?;
208+ let mut txn = TxnRequest :: default ( ) ;
209+ txn. condition . push ( txn_cond_eq_keys_with_prefix (
210+ & binding_prefix,
211+ total_binding_count,
212+ ) ) ;
216213
217- let ( succ, _responses) = send_txn ( self , txn) . await ?;
214+ for ( key, seq) in batch {
215+ txn_delete_exact ( & mut txn, & key, seq) ;
216+ }
218217
219- if succ {
220- debug ! ( "Policy dropped successfully" ) ;
221- return Ok ( Ok ( Some ( ( seq_id, seq_meta) ) ) ) ;
222- }
223- // Retry if transaction failed
218+ let ( succ, _) = send_txn ( self , txn) . await ?;
219+ if succ {
220+ continue ; // Loop back to clean next batch or drop policy
224221 }
222+ // Transaction failed, retry from beginning
223+ } else {
224+ // All bindings cleaned up - now drop the policy itself
225+ let id_ident = seq_id. data . into_t_ident ( tenant) ;
226+ let mut txn = TxnRequest :: default ( ) ;
225227
226- PolicyDropAction :: CleanupBatch {
227- prefix,
228- binding_count,
229- bindings,
230- table_updates,
231- } => {
232- cleanup_iterations += 1 ;
233- if cleanup_iterations > MAX_CLEANUP_ITERATIONS {
234- let op_desc = format ! (
235- "drop masking policy '{}' (too many stale bindings or concurrent conflicts)" ,
236- name_ident. data_mask_name( )
237- ) ;
238- return Err ( MetaTxnError :: TxnRetryMaxTimes ( TxnRetryMaxTimes :: new (
239- & op_desc,
240- MAX_CLEANUP_ITERATIONS as u32 ,
241- ) ) ) ;
242- }
228+ // Ensure no new bindings were created
229+ txn. condition
230+ . push ( txn_cond_eq_keys_with_prefix ( & binding_prefix, 0 ) ) ;
243231
244- // Incremental cleanup: Remove a batch of stale bindings to avoid oversized transactions.
245- // This transaction only cleans up stale bindings; it does NOT drop the policy itself.
246- // After successful cleanup, the loop retries and may progress to FinalDrop.
247- //
248- // Note: If the process crashes mid-cleanup, remaining stale bindings will be cleaned
249- // on the next DROP attempt. This is acceptable as the cleanup is idempotent.
250- let mut txn = TxnRequest :: default ( ) ;
251- txn. condition
252- . push ( txn_cond_eq_keys_with_prefix ( & prefix, binding_count) ) ;
253-
254- for ( key, seq) in bindings {
255- txn_delete_exact ( & mut txn, & key, seq) ;
256- }
232+ txn_delete_exact ( & mut txn, name_ident, seq_id. seq ) ;
233+ txn_delete_exact ( & mut txn, & id_ident, seq_meta. seq ) ;
257234
258- for update in table_updates {
259- txn. condition
260- . push ( txn_cond_eq_seq ( & update. table_id , update. seq ) ) ;
261- let op = txn_op_put_pb ( & update. table_id , & update. meta , None ) ?;
262- txn. if_then . push ( op) ;
263- }
235+ // TODO: Tentative retention for compatibility. Can be deleted later.
236+ clear_table_column_mask_policy ( self , name_ident, & mut txn) . await ?;
264237
265- let ( succ, _) = send_txn ( self , txn) . await ?;
266- debug ! (
267- "Cleaned up batch of stale bindings (iteration {}/{}), succ={}" ,
268- cleanup_iterations, MAX_CLEANUP_ITERATIONS , succ
269- ) ;
270- // Continue to next iteration to process remaining bindings
238+ let ( succ, _responses) = send_txn ( self , txn) . await ?;
239+ if succ {
240+ return Ok ( Ok ( Some ( ( seq_id, seq_meta) ) ) ) ;
271241 }
242+ // Transaction failed, retry
272243 }
273244 }
274245 }
@@ -315,33 +286,3 @@ async fn clear_table_column_mask_policy(
315286 txn_delete_exact ( txn, & id_list_key, seq_id_list. seq ) ;
316287 Ok ( ( ) )
317288}
318-
319- type MaskPolicyUsage = PolicyUsage < MaskPolicyTableIdIdent > ;
320-
321- impl PolicyBinding for MaskPolicyTableIdIdent {
322- fn prefix_for ( tenant : & Tenant , policy_id : u64 ) -> DirName < Self > {
323- DirName :: new ( MaskPolicyTableIdIdent :: new_generic (
324- tenant. clone ( ) ,
325- MaskPolicyIdTableId {
326- policy_id,
327- table_id : 0 ,
328- } ,
329- ) )
330- }
331-
332- fn table_id ( & self ) -> u64 {
333- self . name ( ) . table_id
334- }
335-
336- fn remove_security_policy_from_table_meta ( table_meta : & mut TableMeta , policy_id : u64 ) -> bool {
337- // For compatibility considerations, retain this field.
338- // column_mask_policy should always None.
339- table_meta. column_mask_policy = None ;
340- let before = table_meta. column_mask_policy_columns_ids . len ( ) ;
341- table_meta
342- . column_mask_policy_columns_ids
343- . retain ( |_, policy| policy. policy_id != policy_id) ;
344-
345- before != table_meta. column_mask_policy_columns_ids . len ( )
346- }
347- }
0 commit comments