Skip to content
151 changes: 151 additions & 0 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
Expand Down Expand Up @@ -2996,6 +2997,156 @@ async fn test_count_wildcard_on_window() -> Result<()> {
Ok(())
}

#[tokio::test]
// Test with `repartition_sorts` disabled, causing a full resort of the data
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false(
) -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
@r#"
AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
"#);
Ok(())
}

#[ignore] // See https://github.com/apache/datafusion/issues/18380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ignoring the test when the plan is also commented out in the body?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is not commented out.
I asked @rgehan to add the relevant part of explain verbose for us see when the issue happens that was the commented out plan

#[tokio::test]
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true(
) -> Result<()> {
assert_snapshot!(
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
@r#"
AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
SortPreservingMergeExec: [id@0 ASC NULLS LAST]
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
UnionExec
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
"#);

// 💥 Doesn't pass, and generates this plan:
//
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
// UnionExec
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
//
//
// === Excerpt from the verbose explain ===
//
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | plan_type | plan |
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// ...
// | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified |
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | CoalescePartitionsExec |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// | | |
// | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE
// | | |
// | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified |
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
// | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] |
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] |
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] |
// | | UnionExec |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
// ...
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Ok(())
}

async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
repartition_sorts: bool,
) -> Result<String> {
let config = SessionConfig::default()
.with_target_partitions(1)
.with_repartition_sorts(repartition_sorts);
let ctx = SessionContext::new_with_config(config);

let testdata = parquet_test_data();

// Register "sorted" table, that is sorted
ctx.register_parquet(
"sorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default()
.file_sort_order(vec![vec![col("id").sort(true, false)]]),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sidenote: Interestingly, with nulls_first: true (L3074 too), even with the fixes from #9867, the plan includes an extra SortExec node that re-sorts with nulls last. I'm not sure whether that's on purpose, or if there's another issue)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know whether the file is actually sorted or you just add this function to trick the planner to plan this file as it is sorted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file is not actually sorted no, but I was hoping this was a valid way of making the planner think it is, and plan accordingly.

Can this cause issues?

Copy link
Contributor

@NGA-TRAN NGA-TRAN Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this cause issues?

Likely not but I am not % sure if we do anything special with parquet file.

)
.await?;

// Register "unsorted" table
ctx.register_parquet(
"unsorted",
&format!("{testdata}/alltypes_tiny_pages.parquet"),
ParquetReadOptions::default(),
)
.await?;

let source_sorted = ctx
.table("sorted")
.await
.unwrap()
.select(vec![col("id")])
.unwrap();

let source_unsorted = ctx
.table("unsorted")
.await
.unwrap()
.select(vec![col("id")])
.unwrap();

let source_unsorted_resorted =
source_unsorted.sort(vec![col("id").sort(true, false)])?;

let union = source_sorted.union(source_unsorted_resorted)?;

let agg = union.aggregate(vec![col("id")], vec![])?;

let df = agg;

// To be able to remove user specific paths from the plan, for stable assertions
let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string();
let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean);

// Use displayable() rather than explain().collect() to avoid table formatting issues. We need
// to replace machine-specific paths with variable lengths, which breaks table alignment and
// causes snapshot mismatches.
let physical_plan = df.create_physical_plan().await?;
let displayable_plan = displayable(physical_plan.as_ref())
.indent(true)
.to_string()
.replace(testdata_clean, "{testdata}");

Ok(displayable_plan)
}

#[tokio::test]
async fn test_count_wildcard_on_aggregate() -> Result<()> {
let ctx = create_join_context()?;
Expand Down
Loading