From 001dec21d79faec9ee28915a6379f33f04bdba39 Mon Sep 17 00:00:00 2001 From: goowtham1412-p Date: Thu, 30 Oct 2025 10:18:24 +0530 Subject: [PATCH 1/5] docs: Move TopK example code to extending-operators documentation --- .../library-user-guide/extending-operators.md | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 5c28d1e67058..36dc3cb838f5 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -57,3 +57,170 @@ fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { ``` To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. + +## User-Defined Plan Example: TopK Operator + +This example demonstrates creating a custom TopK operator that optimizes "find the top K elements" queries. + +### Background + +A "Top K" node is a common query optimization used for queries like "find the top 3 customers by revenue". + +**Example SQL:** +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV location 'tests/data/customer.csv'; + +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +``` + +**Naive Plan:** +The standard approach fully sorts the input before discarding everything except the top 3 elements. + +**Optimized TopK Plan:** +Instead of sorting everything, we maintain only the top K elements in memory, significantly reducing buffer requirements. + +### Implementation + +The TopK implementation consists of several key components: + +#### 1. TopKPlanNode - The Logical Plan Node +```rust +#[derive(PartialEq, Eq, PartialOrd, Hash)] +struct TopKPlanNode { + k: usize, + input: LogicalPlan, + expr: SortExpr, +} + +impl UserDefinedLogicalNodeCore for TopKPlanNode { + fn name(&self) -> &str { + "TopK" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec { + vec![self.expr.expr.clone()] + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TopK: k={}", self.k) + } + + fn with_exprs_and_inputs( + &self, + mut exprs: Vec, + mut inputs: Vec, + ) -> Result { + Ok(Self { + k: self.k, + input: inputs.swap_remove(0), + expr: self.expr.with_expr(exprs.swap_remove(0)), + }) + } +} +``` + +#### 2. TopKOptimizerRule - Rewrites Plans +```rust +impl OptimizerRule for TopKOptimizerRule { + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + // Look for pattern: Limit -> Sort + // Replace with: TopK + let LogicalPlan::Limit(ref limit) = plan else { + return Ok(Transformed::no(plan)); + }; + + if let LogicalPlan::Sort(Sort { ref expr, ref input, .. }) = limit.input.as_ref() { + if expr.len() == 1 { + return Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(TopKPlanNode { + k: limit.fetch.unwrap(), + input: input.as_ref().clone(), + expr: expr[0].clone(), + }), + }))); + } + } + + Ok(Transformed::no(plan)) + } +} +``` + +#### 3. TopKPlanner - Creates Physical Plan +```rust +#[async_trait] +impl ExtensionPlanner for TopKPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + Ok( + if let Some(topk_node) = node.as_any().downcast_ref::() { + Some(Arc::new(TopKExec::new( + physical_inputs[0].clone(), + topk_node.k, + ))) + } else { + None + }, + ) + } +} +``` + +#### 4. TopKExec - Physical Execution +```rust +struct TopKExec { + input: Arc, + k: usize, + cache: PlanProperties, +} + +impl ExecutionPlan for TopKExec { + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + Ok(Box::pin(TopKReader { + input: self.input.execute(partition, context)?, + k: self.k, + done: false, + state: BTreeMap::new(), + })) + } +} +``` + +### Usage + +To use the TopK operator in your queries: +```rust +let config = SessionConfig::new().with_target_partitions(48); +let state = SessionStateBuilder::new() + .with_config(config) + .with_query_planner(Arc::new(TopKQueryPlanner {})) + .with_optimizer_rule(Arc::new(TopKOptimizerRule::default())) + .build(); + +let ctx = SessionContext::new_with_state(state); +``` + +For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. From 55f954688b71be8e9fc8859bf2f1f2969db513dc Mon Sep 17 00:00:00 2001 From: goowtham1412-p Date: Thu, 30 Oct 2025 10:25:05 +0530 Subject: [PATCH 2/5] docs: Move TopK example to extending-operators documentation (Fixes #15774) --- .../library-user-guide/extending-operators.md | 81 +++---------------- 1 file changed, 12 insertions(+), 69 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 36dc3cb838f5..8c57a1994901 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -1,63 +1,3 @@ - - -# Extending DataFusion's operators: custom LogicalPlan and Execution Plans - -DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. - -## About DataFusion µWheel - -[DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community. - -### Optimizing Logical Plan - -The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path. - -```rust,ignore -fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, -) -> Result> { - // Attempts to rewrite a logical plan to a uwheel-based plan that either provides - // plan-time aggregates or skips execution based on min/max pruning. - if let Some(rewritten) = self.try_rewrite(&plan) { - Ok(Transformed::yes(rewritten)) - } else { - Ok(Transformed::no(plan)) - } -} -``` - -```rust,ignore -// Converts a uwheel aggregate result to a TableScan with a MemTable as source -fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { - let data = Float64Array::from(vec![result]); - let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; - let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); - let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?; - mem_table_as_table_scan(mem_table, df_schema) -} -``` - -To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. - ## User-Defined Plan Example: TopK Operator This example demonstrates creating a custom TopK operator that optimizes "find the top K elements" queries. @@ -85,7 +25,7 @@ Instead of sorting everything, we maintain only the top K elements in memory, si The TopK implementation consists of several key components: #### 1. TopKPlanNode - The Logical Plan Node -```rust +```rust,ignore #[derive(PartialEq, Eq, PartialOrd, Hash)] struct TopKPlanNode { k: usize, @@ -129,24 +69,27 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { ``` #### 2. TopKOptimizerRule - Rewrites Plans -```rust +```rust,ignore impl OptimizerRule for TopKOptimizerRule { fn rewrite( &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - // Look for pattern: Limit -> Sort - // Replace with: TopK + // Look for pattern: Limit -> Sort and replace with TopK let LogicalPlan::Limit(ref limit) = plan else { return Ok(Transformed::no(plan)); }; + let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else { + return Ok(Transformed::no(plan)); + }; + if let LogicalPlan::Sort(Sort { ref expr, ref input, .. }) = limit.input.as_ref() { if expr.len() == 1 { return Ok(Transformed::yes(LogicalPlan::Extension(Extension { node: Arc::new(TopKPlanNode { - k: limit.fetch.unwrap(), + k: fetch, input: input.as_ref().clone(), expr: expr[0].clone(), }), @@ -160,7 +103,7 @@ impl OptimizerRule for TopKOptimizerRule { ``` #### 3. TopKPlanner - Creates Physical Plan -```rust +```rust,ignore #[async_trait] impl ExtensionPlanner for TopKPlanner { async fn plan_extension( @@ -186,7 +129,7 @@ impl ExtensionPlanner for TopKPlanner { ``` #### 4. TopKExec - Physical Execution -```rust +```rust,ignore struct TopKExec { input: Arc, k: usize, @@ -212,7 +155,7 @@ impl ExecutionPlan for TopKExec { ### Usage To use the TopK operator in your queries: -```rust +```rust,ignore let config = SessionConfig::new().with_target_partitions(48); let state = SessionStateBuilder::new() .with_config(config) @@ -223,4 +166,4 @@ let state = SessionStateBuilder::new() let ctx = SessionContext::new_with_state(state); ``` -For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. +For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. \ No newline at end of file From f83d724df2872ab59a58934e72267ec201a27770 Mon Sep 17 00:00:00 2001 From: gowtham1412-p Date: Sat, 8 Nov 2025 11:28:53 +0530 Subject: [PATCH 3/5] =?UTF-8?q?`docs:=20Preserve=20=C2=B5Wheel=20content?= =?UTF-8?q?=20and=20add=20TopK=20as=20additional=20example`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../library-user-guide/extending-operators.md | 93 ++++++++++++++----- 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index 8c57a1994901..a2652fcfc37f 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -1,4 +1,61 @@ -## User-Defined Plan Example: TopK Operator + + +# Extending DataFusion's operators: custom LogicalPlan and Execution Plans + +DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section demonstrates these capabilities through two examples. + +## Example 1: DataFusion µWheel + +[DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community. + +### Optimizing Logical Plan + +The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path. +```rust,ignore +fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, +) -> Result { + // Attempts to rewrite a logical plan to a uwheel-based plan that either provides + // plan-time aggregates or skips execution based on min/max pruning. + if let Some(rewritten) = self.try_rewrite(&plan) { + Ok(Transformed::yes(rewritten)) + } else { + Ok(Transformed::no(plan)) + } +} + +// Converts a uwheel aggregate result to a TableScan with a MemTable as source +fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { + let data = Float64Array::from(vec![result]); + let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; + let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); + let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?; + mem_table_as_table_scan(mem_table, df_schema) +} +``` + +To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum. + +## Example 2: TopK Operator This example demonstrates creating a custom TopK operator that optimizes "find the top K elements" queries. @@ -32,33 +89,27 @@ struct TopKPlanNode { input: LogicalPlan, expr: SortExpr, } - impl UserDefinedLogicalNodeCore for TopKPlanNode { fn name(&self) -> &str { "TopK" } - - fn inputs(&self) -> Vec<&LogicalPlan> { + fn inputs(&self) -> Vec { vec![&self.input] } - fn schema(&self) -> &DFSchemaRef { self.input.schema() } - - fn expressions(&self) -> Vec { + fn expressions(&self) -> Vec { vec![self.expr.expr.clone()] } - fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "TopK: k={}", self.k) } - fn with_exprs_and_inputs( &self, - mut exprs: Vec, - mut inputs: Vec, - ) -> Result { + mut exprs: Vec, + mut inputs: Vec, + ) -> Result { Ok(Self { k: self.k, input: inputs.swap_remove(0), @@ -75,7 +126,7 @@ impl OptimizerRule for TopKOptimizerRule { &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, - ) -> Result> { + ) -> Result { // Look for pattern: Limit -> Sort and replace with TopK let LogicalPlan::Limit(ref limit) = plan else { return Ok(Transformed::no(plan)); @@ -111,11 +162,11 @@ impl ExtensionPlanner for TopKPlanner { _planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, logical_inputs: &[&LogicalPlan], - physical_inputs: &[Arc], + physical_inputs: &[Arc], _session_state: &SessionState, - ) -> Result>> { + ) -> Result> { Ok( - if let Some(topk_node) = node.as_any().downcast_ref::() { + if let Some(topk_node) = node.as_any().downcast_ref::() { Some(Arc::new(TopKExec::new( physical_inputs[0].clone(), topk_node.k, @@ -131,17 +182,16 @@ impl ExtensionPlanner for TopKPlanner { #### 4. TopKExec - Physical Execution ```rust,ignore struct TopKExec { - input: Arc, + input: Arc, k: usize, cache: PlanProperties, } - impl ExecutionPlan for TopKExec { fn execute( &self, partition: usize, - context: Arc, - ) -> Result { + context: Arc, + ) -> Result { Ok(Box::pin(TopKReader { input: self.input.execute(partition, context)?, k: self.k, @@ -166,4 +216,5 @@ let state = SessionStateBuilder::new() let ctx = SessionContext::new_with_state(state); ``` -For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. \ No newline at end of file +For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. +``` From 758f7ffb8bcd56cd54521c7d7b109e374086050f Mon Sep 17 00:00:00 2001 From: gowtham1412-p Date: Mon, 10 Nov 2025 14:40:01 +0530 Subject: [PATCH 4/5] Update extending-operators.md --- docs/source/library-user-guide/extending-operators.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index a2652fcfc37f..df011847f62c 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -33,7 +33,7 @@ fn rewrite( &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, -) -> Result { +) -> Result> { // Attempts to rewrite a logical plan to a uwheel-based plan that either provides // plan-time aggregates or skips execution based on min/max pruning. if let Some(rewritten) = self.try_rewrite(&plan) { @@ -44,7 +44,7 @@ fn rewrite( } // Converts a uwheel aggregate result to a TableScan with a MemTable as source -fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { +fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result { let data = Float64Array::from(vec![result]); let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?; let df_schema = Arc::new(DFSchema::try_from(schema.clone())?); From 4c5435052cbf61ac08ae8dda175272b1e0271606 Mon Sep 17 00:00:00 2001 From: gowtham1412-p Date: Thu, 13 Nov 2025 15:54:56 +0530 Subject: [PATCH 5/5] =?UTF-8?q?docs:=20Fix=20prettier=20formatting=20and?= =?UTF-8?q?=20preserve=20=C2=B5Wheel=20content?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../library-user-guide/extending-operators.md | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/docs/source/library-user-guide/extending-operators.md b/docs/source/library-user-guide/extending-operators.md index df011847f62c..5ecc1f0ca2e9 100644 --- a/docs/source/library-user-guide/extending-operators.md +++ b/docs/source/library-user-guide/extending-operators.md @@ -61,7 +61,7 @@ This example demonstrates creating a custom TopK operator that optimizes "find t ### Background -A "Top K" node is a common query optimization used for queries like "find the top 3 customers by revenue". +A "Top K" node is a common query optimization used for queries like "find the top 3 customers by revenue". **Example SQL:** ```sql @@ -93,13 +93,13 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { fn name(&self) -> &str { "TopK" } - fn inputs(&self) -> Vec { + fn inputs(&self) -> Vec<&LogicalPlan> { vec![&self.input] } fn schema(&self) -> &DFSchemaRef { self.input.schema() } - fn expressions(&self) -> Vec { + fn expressions(&self) -> Vec { vec![self.expr.expr.clone()] } fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -107,9 +107,9 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { } fn with_exprs_and_inputs( &self, - mut exprs: Vec, - mut inputs: Vec, - ) -> Result { + mut exprs: Vec, + mut inputs: Vec, + ) -> Result { Ok(Self { k: self.k, input: inputs.swap_remove(0), @@ -126,16 +126,16 @@ impl OptimizerRule for TopKOptimizerRule { &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, - ) -> Result { + ) -> Result> { // Look for pattern: Limit -> Sort and replace with TopK let LogicalPlan::Limit(ref limit) = plan else { return Ok(Transformed::no(plan)); }; - + let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else { return Ok(Transformed::no(plan)); }; - + if let LogicalPlan::Sort(Sort { ref expr, ref input, .. }) = limit.input.as_ref() { if expr.len() == 1 { return Ok(Transformed::yes(LogicalPlan::Extension(Extension { @@ -147,7 +147,7 @@ impl OptimizerRule for TopKOptimizerRule { }))); } } - + Ok(Transformed::no(plan)) } } @@ -162,11 +162,11 @@ impl ExtensionPlanner for TopKPlanner { _planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, logical_inputs: &[&LogicalPlan], - physical_inputs: &[Arc], + physical_inputs: &[Arc], _session_state: &SessionState, - ) -> Result> { + ) -> Result>> { Ok( - if let Some(topk_node) = node.as_any().downcast_ref::() { + if let Some(topk_node) = node.as_any().downcast_ref::() { Some(Arc::new(TopKExec::new( physical_inputs[0].clone(), topk_node.k, @@ -182,7 +182,7 @@ impl ExtensionPlanner for TopKPlanner { #### 4. TopKExec - Physical Execution ```rust,ignore struct TopKExec { - input: Arc, + input: Arc, k: usize, cache: PlanProperties, } @@ -190,8 +190,8 @@ impl ExecutionPlan for TopKExec { fn execute( &self, partition: usize, - context: Arc, - ) -> Result { + context: Arc, + ) -> Result { Ok(Box::pin(TopKReader { input: self.input.execute(partition, context)?, k: self.k, @@ -212,9 +212,8 @@ let state = SessionStateBuilder::new() .with_query_planner(Arc::new(TopKQueryPlanner {})) .with_optimizer_rule(Arc::new(TopKOptimizerRule::default())) .build(); - + let ctx = SessionContext::new_with_state(state); ``` For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`. -```