Skip to content

Commit 48b46b3

Browse files
authored
[FEAT] [New Query Plan] Add support for Projection and Coalesce, enable many tests (#1256)
This PR adds support for projection (`df.select()`, `df.exclude()`, `df.with_column()`) and coalescing (`df.into_partitions()`), and enables a bunch of tests that depended on these features. The main fixes that popped up once enabling the tests were: - Misc. input validation - Missing plan flattening for `df.repartition()` with unknown partition scheme (i.e. simple split). This PR is stacked on #1254 and #1252, so the final commit contains the actual diff: 6754d67
1 parent 34443aa commit 48b46b3

31 files changed

+271
-76
lines changed

daft/execution/rust_physical_plan_shim.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ def tabular_scan(
4141
)
4242

4343

44+
def project(
45+
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr]
46+
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
47+
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
48+
return physical_plan.pipeline_instruction(
49+
child_plan=input,
50+
pipeable_instruction=execution_step.Project(expr_projection),
51+
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
52+
)
53+
54+
4455
def sort(
4556
input: physical_plan.InProgressPhysicalPlan[PartitionT],
4657
sort_by: list[PyExpr],

daft/logical/rust_logical_plan.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,23 @@ def project(
9595
projection: ExpressionsProjection,
9696
custom_resource_request: ResourceRequest = ResourceRequest(),
9797
) -> RustLogicalPlanBuilder:
98-
raise NotImplementedError("not implemented")
98+
if custom_resource_request != ResourceRequest():
99+
raise NotImplementedError("ResourceRequests not supported for new query planner")
100+
schema = projection.resolve_schema(self.schema())
101+
exprs = [expr._expr for expr in projection]
102+
builder = self._builder.project(exprs, schema._schema)
103+
return RustLogicalPlanBuilder(builder)
99104

100105
def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
106+
# TODO(Clark): Move this logic to Rust side after we've ported ExpressionsProjection.
107+
predicate_expr_proj = ExpressionsProjection([predicate])
108+
predicate_schema = predicate_expr_proj.resolve_schema(self.schema())
109+
for resolved_field, predicate_expr in zip(predicate_schema, predicate_expr_proj):
110+
resolved_type = resolved_field.dtype
111+
if resolved_type != DataType.bool():
112+
raise ValueError(
113+
f"Expected expression {predicate_expr} to resolve to type Boolean, but received: {resolved_type}"
114+
)
101115
builder = self._builder.filter(predicate._expr)
102116
return RustLogicalPlanBuilder(builder)
103117

@@ -137,7 +151,12 @@ def repartition(
137151
return RustLogicalPlanBuilder(builder)
138152

139153
def coalesce(self, num_partitions: int) -> RustLogicalPlanBuilder:
140-
raise NotImplementedError("not implemented")
154+
if num_partitions > self.num_partitions():
155+
raise ValueError(
156+
f"Coalesce can only reduce the number of partitions: {num_partitions} vs {self.num_partitions}"
157+
)
158+
builder = self._builder.coalesce(num_partitions)
159+
return RustLogicalPlanBuilder(builder)
141160

142161
def agg(
143162
self,

src/daft-plan/src/builder.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,25 @@ impl LogicalPlanBuilder {
7979
Ok(logical_plan_builder)
8080
}
8181

82+
pub fn project(
83+
&self,
84+
projection: Vec<PyExpr>,
85+
projected_schema: &PySchema,
86+
) -> PyResult<LogicalPlanBuilder> {
87+
let projection_exprs = projection
88+
.iter()
89+
.map(|e| e.clone().into())
90+
.collect::<Vec<Expr>>();
91+
let logical_plan: LogicalPlan = ops::Project::new(
92+
projection_exprs,
93+
projected_schema.clone().into(),
94+
self.plan.clone(),
95+
)
96+
.into();
97+
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
98+
Ok(logical_plan_builder)
99+
}
100+
82101
pub fn filter(&self, predicate: &PyExpr) -> PyResult<LogicalPlanBuilder> {
83102
let logical_plan: LogicalPlan =
84103
ops::Filter::new(predicate.expr.clone(), self.plan.clone()).into();
@@ -125,6 +144,13 @@ impl LogicalPlanBuilder {
125144
Ok(logical_plan_builder)
126145
}
127146

147+
pub fn coalesce(&self, num_partitions: usize) -> PyResult<LogicalPlanBuilder> {
148+
let logical_plan: LogicalPlan =
149+
ops::Coalesce::new(num_partitions, self.plan.clone()).into();
150+
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
151+
Ok(logical_plan_builder)
152+
}
153+
128154
pub fn distinct(&self) -> PyResult<LogicalPlanBuilder> {
129155
let logical_plan: LogicalPlan = ops::Distinct::new(self.plan.clone()).into();
130156
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());

src/daft-plan/src/logical_plan.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ use crate::{ops::*, PartitionScheme, PartitionSpec};
77
#[derive(Clone, Debug)]
88
pub enum LogicalPlan {
99
Source(Source),
10+
Project(Project),
1011
Filter(Filter),
1112
Limit(Limit),
1213
Sort(Sort),
1314
Repartition(Repartition),
15+
Coalesce(Coalesce),
1416
Distinct(Distinct),
1517
Aggregate(Aggregate),
1618
Concat(Concat),
@@ -21,10 +23,14 @@ impl LogicalPlan {
2123
pub fn schema(&self) -> SchemaRef {
2224
match self {
2325
Self::Source(Source { schema, .. }) => schema.clone(),
26+
Self::Project(Project {
27+
projected_schema, ..
28+
}) => projected_schema.clone(),
2429
Self::Filter(Filter { input, .. }) => input.schema(),
2530
Self::Limit(Limit { input, .. }) => input.schema(),
2631
Self::Sort(Sort { input, .. }) => input.schema(),
2732
Self::Repartition(Repartition { input, .. }) => input.schema(),
33+
Self::Coalesce(Coalesce { input, .. }) => input.schema(),
2834
Self::Distinct(Distinct { input, .. }) => input.schema(),
2935
Self::Aggregate(aggregate) => aggregate.schema(),
3036
Self::Concat(Concat { input, .. }) => input.schema(),
@@ -35,6 +41,7 @@ impl LogicalPlan {
3541
pub fn partition_spec(&self) -> Arc<PartitionSpec> {
3642
match self {
3743
Self::Source(Source { partition_spec, .. }) => partition_spec.clone(),
44+
Self::Project(Project { input, .. }) => input.partition_spec(),
3845
Self::Filter(Filter { input, .. }) => input.partition_spec(),
3946
Self::Limit(Limit { input, .. }) => input.partition_spec(),
4047
Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal(
@@ -54,6 +61,9 @@ impl LogicalPlan {
5461
Some(partition_by.clone()),
5562
)
5663
.into(),
64+
Self::Coalesce(Coalesce { num_to, .. }) => {
65+
PartitionSpec::new_internal(PartitionScheme::Unknown, *num_to, None).into()
66+
}
5767
Self::Distinct(Distinct { input, .. }) => input.partition_spec(),
5868
Self::Aggregate(Aggregate { input, .. }) => input.partition_spec(), // TODO
5969
Self::Concat(Concat { input, other }) => PartitionSpec::new_internal(
@@ -69,10 +79,12 @@ impl LogicalPlan {
6979
pub fn children(&self) -> Vec<&Self> {
7080
match self {
7181
Self::Source(..) => vec![],
82+
Self::Project(Project { input, .. }) => vec![input],
7283
Self::Filter(Filter { input, .. }) => vec![input],
7384
Self::Limit(Limit { input, .. }) => vec![input],
7485
Self::Sort(Sort { input, .. }) => vec![input],
7586
Self::Repartition(Repartition { input, .. }) => vec![input],
87+
Self::Coalesce(Coalesce { input, .. }) => vec![input],
7688
Self::Distinct(Distinct { input, .. }) => vec![input],
7789
Self::Aggregate(Aggregate { input, .. }) => vec![input],
7890
Self::Concat(Concat { input, other }) => vec![input, other],
@@ -83,10 +95,12 @@ impl LogicalPlan {
8395
pub fn multiline_display(&self) -> Vec<String> {
8496
match self {
8597
Self::Source(source) => source.multiline_display(),
98+
Self::Project(Project { projection, .. }) => vec![format!("Project: {projection:?}")],
8699
Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")],
87100
Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")],
88101
Self::Sort(sort) => sort.multiline_display(),
89102
Self::Repartition(repartition) => repartition.multiline_display(),
103+
Self::Coalesce(Coalesce { num_to, .. }) => vec![format!("Coalesce: {num_to}")],
90104
Self::Distinct(_) => vec!["Distinct".to_string()],
91105
Self::Aggregate(aggregate) => aggregate.multiline_display(),
92106
Self::Concat(_) => vec!["Concat".to_string()],
@@ -112,10 +126,12 @@ macro_rules! impl_from_data_struct_for_logical_plan {
112126
}
113127

114128
impl_from_data_struct_for_logical_plan!(Source);
129+
impl_from_data_struct_for_logical_plan!(Project);
115130
impl_from_data_struct_for_logical_plan!(Filter);
116131
impl_from_data_struct_for_logical_plan!(Limit);
117132
impl_from_data_struct_for_logical_plan!(Sort);
118133
impl_from_data_struct_for_logical_plan!(Repartition);
134+
impl_from_data_struct_for_logical_plan!(Coalesce);
119135
impl_from_data_struct_for_logical_plan!(Distinct);
120136
impl_from_data_struct_for_logical_plan!(Aggregate);
121137
impl_from_data_struct_for_logical_plan!(Concat);

src/daft-plan/src/ops/coalesce.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use std::sync::Arc;
2+
3+
use crate::LogicalPlan;
4+
5+
#[derive(Clone, Debug)]
6+
pub struct Coalesce {
7+
// Number of partitions to coalesce to.
8+
pub num_to: usize,
9+
// Upstream node.
10+
pub input: Arc<LogicalPlan>,
11+
}
12+
13+
impl Coalesce {
14+
pub(crate) fn new(num_to: usize, input: Arc<LogicalPlan>) -> Self {
15+
Self { num_to, input }
16+
}
17+
}

src/daft-plan/src/ops/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
mod agg;
2+
mod coalesce;
23
mod concat;
34
mod distinct;
45
mod filter;
56
mod limit;
7+
mod project;
68
mod repartition;
79
mod sink;
810
mod sort;
911
mod source;
1012

1113
pub use agg::Aggregate;
14+
pub use coalesce::Coalesce;
1215
pub use concat::Concat;
1316
pub use distinct::Distinct;
1417
pub use filter::Filter;
1518
pub use limit::Limit;
19+
pub use project::Project;
1620
pub use repartition::Repartition;
1721
pub use sink::Sink;
1822
pub use sort::Sort;

src/daft-plan/src/ops/project.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use std::sync::Arc;
2+
3+
use daft_core::schema::SchemaRef;
4+
use daft_dsl::Expr;
5+
6+
use crate::LogicalPlan;
7+
8+
#[derive(Clone, Debug)]
9+
pub struct Project {
10+
pub projection: Vec<Expr>,
11+
pub projected_schema: SchemaRef,
12+
// Upstream node.
13+
pub input: Arc<LogicalPlan>,
14+
}
15+
16+
impl Project {
17+
pub(crate) fn new(
18+
projection: Vec<Expr>,
19+
projected_schema: SchemaRef,
20+
input: Arc<LogicalPlan>,
21+
) -> Self {
22+
Self {
23+
projection,
24+
projected_schema,
25+
input,
26+
}
27+
}
28+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use std::sync::Arc;
2+
3+
use crate::physical_plan::PhysicalPlan;
4+
5+
#[derive(Clone, Debug)]
6+
pub struct Flatten {
7+
// Upstream node.
8+
pub input: Arc<PhysicalPlan>,
9+
}
10+
11+
impl Flatten {
12+
pub(crate) fn new(input: Arc<PhysicalPlan>) -> Self {
13+
Self { input }
14+
}
15+
}

src/daft-plan/src/physical_ops/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ mod concat;
44
mod csv;
55
mod fanout;
66
mod filter;
7+
mod flatten;
78
#[cfg(feature = "python")]
89
mod in_memory;
910
mod json;
1011
mod limit;
1112
mod parquet;
13+
mod project;
1214
mod reduce;
1315
mod sort;
1416
mod split;
@@ -19,11 +21,13 @@ pub use concat::Concat;
1921
pub use csv::{TabularScanCsv, TabularWriteCsv};
2022
pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom};
2123
pub use filter::Filter;
24+
pub use flatten::Flatten;
2225
#[cfg(feature = "python")]
2326
pub use in_memory::InMemoryScan;
2427
pub use json::{TabularScanJson, TabularWriteJson};
2528
pub use limit::Limit;
2629
pub use parquet::{TabularScanParquet, TabularWriteParquet};
30+
pub use project::Project;
2731
pub use reduce::ReduceMerge;
2832
pub use sort::Sort;
2933
pub use split::Split;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use std::sync::Arc;
2+
3+
use daft_dsl::Expr;
4+
5+
use crate::physical_plan::PhysicalPlan;
6+
7+
#[derive(Clone, Debug)]
8+
pub struct Project {
9+
pub projection: Vec<Expr>,
10+
// Upstream node.
11+
pub input: Arc<PhysicalPlan>,
12+
}
13+
14+
impl Project {
15+
pub(crate) fn new(projection: Vec<Expr>, input: Arc<PhysicalPlan>) -> Self {
16+
Self { projection, input }
17+
}
18+
}

0 commit comments

Comments
 (0)