Skip to content

Commit 55f9546

Browse files
committed
docs: Move TopK example to extending-operators documentation (Fixes #15774)
1 parent 001dec2 commit 55f9546

File tree

1 file changed

+12
-69
lines changed

1 file changed

+12
-69
lines changed

docs/source/library-user-guide/extending-operators.md

Lines changed: 12 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,3 @@
1-
<!---
2-
Licensed to the Apache Software Foundation (ASF) under one
3-
or more contributor license agreements. See the NOTICE file
4-
distributed with this work for additional information
5-
regarding copyright ownership. The ASF licenses this file
6-
to you under the Apache License, Version 2.0 (the
7-
"License"); you may not use this file except in compliance
8-
with the License. You may obtain a copy of the License at
9-
10-
http://www.apache.org/licenses/LICENSE-2.0
11-
12-
Unless required by applicable law or agreed to in writing,
13-
software distributed under the License is distributed on an
14-
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
KIND, either express or implied. See the License for the
16-
specific language governing permissions and limitations
17-
under the License.
18-
-->
19-
20-
# Extending DataFusion's operators: custom LogicalPlan and Execution Plans
21-
22-
DataFusion supports extension of operators by transforming logical plan and execution plan through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities.
23-
24-
## About DataFusion µWheel
25-
26-
[DataFusion µWheel](https://github.com/uwheel/datafusion-uwheel/tree/main) is a native DataFusion optimizer which improves query performance for time-based analytics through fast temporal aggregation and pruning using custom indices. The integration of µWheel into DataFusion is a joint effort with the DataFusion community.
27-
28-
### Optimizing Logical Plan
29-
30-
The `rewrite` function transforms logical plans by identifying temporal patterns and aggregation functions that match the stored wheel indices. When match is found, it queries the corresponding index to retrieve pre-computed aggregate values, stores these results in a [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/memory/struct.MemTable.html), and returns as a new `LogicalPlan::TableScan`. If no match is found, the original plan proceeds unchanged through DataFusion's standard execution path.
31-
32-
```rust,ignore
33-
fn rewrite(
34-
&self,
35-
plan: LogicalPlan,
36-
_config: &dyn OptimizerConfig,
37-
) -> Result<Transformed<LogicalPlan>> {
38-
// Attempts to rewrite a logical plan to a uwheel-based plan that either provides
39-
// plan-time aggregates or skips execution based on min/max pruning.
40-
if let Some(rewritten) = self.try_rewrite(&plan) {
41-
Ok(Transformed::yes(rewritten))
42-
} else {
43-
Ok(Transformed::no(plan))
44-
}
45-
}
46-
```
47-
48-
```rust,ignore
49-
// Converts a uwheel aggregate result to a TableScan with a MemTable as source
50-
fn agg_to_table_scan(result: f64, schema: SchemaRef) -> Result<LogicalPlan> {
51-
let data = Float64Array::from(vec![result]);
52-
let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])?;
53-
let df_schema = Arc::new(DFSchema::try_from(schema.clone())?);
54-
let mem_table = MemTable::try_new(schema, vec![vec![record_batch]])?;
55-
mem_table_as_table_scan(mem_table, df_schema)
56-
}
57-
```
58-
59-
To get a deeper dive into the usage of the µWheel project, visit the [blog post](https://uwheel.rs/post/datafusion_uwheel/) by Max Meldrum.
60-
611
## User-Defined Plan Example: TopK Operator
622

633
This example demonstrates creating a custom TopK operator that optimizes "find the top K elements" queries.
@@ -85,7 +25,7 @@ Instead of sorting everything, we maintain only the top K elements in memory, si
8525
The TopK implementation consists of several key components:
8626

8727
#### 1. TopKPlanNode - The Logical Plan Node
88-
```rust
28+
```rust,ignore
8929
#[derive(PartialEq, Eq, PartialOrd, Hash)]
9030
struct TopKPlanNode {
9131
k: usize,
@@ -129,24 +69,27 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
12969
```
13070

13171
#### 2. TopKOptimizerRule - Rewrites Plans
132-
```rust
72+
```rust,ignore
13373
impl OptimizerRule for TopKOptimizerRule {
13474
fn rewrite(
13575
&self,
13676
plan: LogicalPlan,
13777
_config: &dyn OptimizerConfig,
13878
) -> Result<Transformed<LogicalPlan>> {
139-
// Look for pattern: Limit -> Sort
140-
// Replace with: TopK
79+
// Look for pattern: Limit -> Sort and replace with TopK
14180
let LogicalPlan::Limit(ref limit) = plan else {
14281
return Ok(Transformed::no(plan));
14382
};
14483
84+
let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else {
85+
return Ok(Transformed::no(plan));
86+
};
87+
14588
if let LogicalPlan::Sort(Sort { ref expr, ref input, .. }) = limit.input.as_ref() {
14689
if expr.len() == 1 {
14790
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
14891
node: Arc::new(TopKPlanNode {
149-
k: limit.fetch.unwrap(),
92+
k: fetch,
15093
input: input.as_ref().clone(),
15194
expr: expr[0].clone(),
15295
}),
@@ -160,7 +103,7 @@ impl OptimizerRule for TopKOptimizerRule {
160103
```
161104

162105
#### 3. TopKPlanner - Creates Physical Plan
163-
```rust
106+
```rust,ignore
164107
#[async_trait]
165108
impl ExtensionPlanner for TopKPlanner {
166109
async fn plan_extension(
@@ -186,7 +129,7 @@ impl ExtensionPlanner for TopKPlanner {
186129
```
187130

188131
#### 4. TopKExec - Physical Execution
189-
```rust
132+
```rust,ignore
190133
struct TopKExec {
191134
input: Arc<dyn ExecutionPlan>,
192135
k: usize,
@@ -212,7 +155,7 @@ impl ExecutionPlan for TopKExec {
212155
### Usage
213156

214157
To use the TopK operator in your queries:
215-
```rust
158+
```rust,ignore
216159
let config = SessionConfig::new().with_target_partitions(48);
217160
let state = SessionStateBuilder::new()
218161
.with_config(config)
@@ -223,4 +166,4 @@ let state = SessionStateBuilder::new()
223166
let ctx = SessionContext::new_with_state(state);
224167
```
225168

226-
For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`.
169+
For the complete implementation, see `datafusion/core/tests/user_defined/user_defined_plan.rs`.

0 commit comments

Comments
 (0)