diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 043f42b18c9f..a7393c72c9b3 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -45,6 +45,7 @@ use insta::assert_snapshot; use object_store::local::LocalFileSystem; use std::collections::HashMap; use std::fs; +use std::path::Path; use std::sync::Arc; use tempfile::TempDir; use url::Url; @@ -2996,6 +2997,156 @@ async fn test_count_wildcard_on_window() -> Result<()> { Ok(()) } +#[tokio::test] +// Test with `repartition_sorts` disabled, causing a full resort of the data +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false( +) -> Result<()> { + assert_snapshot!( + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, + @r#" + AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + CoalescePartitionsExec + AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + UnionExec + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + "#); + Ok(()) +} + +#[ignore] // See https://github.com/apache/datafusion/issues/18380 +#[tokio::test] +// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( +) -> Result<()> { + assert_snapshot!( + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, + @r#" + AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + SortPreservingMergeExec: [id@0 ASC NULLS LAST] + AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + UnionExec + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + "#); + + // 💥 Doesn't pass, and generates this plan: + // + // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted + // SortPreservingMergeExec: [id@0 ASC NULLS LAST] + // SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] + // AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] + // UnionExec + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + // DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet + // + // + // === Excerpt from the verbose explain === + // + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | plan_type | plan | + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + // | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // ... + // | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified | + // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | CoalescePartitionsExec | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // | | | + // | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE + // | | | + // | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified | + // | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted | + // | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | + // | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | + // | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] | + // | | UnionExec | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | + // | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet | + // ... + // +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + + Ok(()) +} + +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( + repartition_sorts: bool, +) -> Result { + let config = SessionConfig::default() + .with_target_partitions(1) + .with_repartition_sorts(repartition_sorts); + let ctx = SessionContext::new_with_config(config); + + let testdata = parquet_test_data(); + + // Register "sorted" table, that is sorted + ctx.register_parquet( + "sorted", + &format!("{testdata}/alltypes_tiny_pages.parquet"), + ParquetReadOptions::default() + .file_sort_order(vec![vec![col("id").sort(true, false)]]), + ) + .await?; + + // Register "unsorted" table + ctx.register_parquet( + "unsorted", + &format!("{testdata}/alltypes_tiny_pages.parquet"), + ParquetReadOptions::default(), + ) + .await?; + + let source_sorted = ctx + .table("sorted") + .await + .unwrap() + .select(vec![col("id")]) + .unwrap(); + + let source_unsorted = ctx + .table("unsorted") + .await + .unwrap() + .select(vec![col("id")]) + .unwrap(); + + let source_unsorted_resorted = + source_unsorted.sort(vec![col("id").sort(true, false)])?; + + let union = source_sorted.union(source_unsorted_resorted)?; + + let agg = union.aggregate(vec![col("id")], vec![])?; + + let df = agg; + + // To be able to remove user specific paths from the plan, for stable assertions + let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string(); + let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean); + + // Use displayable() rather than explain().collect() to avoid table formatting issues. We need + // to replace machine-specific paths with variable lengths, which breaks table alignment and + // causes snapshot mismatches. + let physical_plan = df.create_physical_plan().await?; + let displayable_plan = displayable(physical_plan.as_ref()) + .indent(true) + .to_string() + .replace(testdata_clean, "{testdata}"); + + Ok(displayable_plan) +} + #[tokio::test] async fn test_count_wildcard_on_aggregate() -> Result<()> { let ctx = create_join_context()?; diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 620259821871..e3a0eb7e1aa6 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -359,6 +359,94 @@ async fn test_union_inputs_different_sorted2() -> Result<()> { Ok(()) } +#[tokio::test] +// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true( +) -> Result<()> { + assert_snapshot!( + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?, + @r" + Input Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + CoalescePartitionsExec + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + + Optimized Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortPreservingMergeExec: [nullable_col@0 ASC] + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + "); + Ok(()) +} + +#[tokio::test] +// Test with `repartition_sorts` disabled, causing a full resort of the data +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false( +) -> Result<()> { + assert_snapshot!( + union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?, + @r" + Input Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + CoalescePartitionsExec + UnionExec + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + + Optimized Plan: + OutputRequirementExec: order_by=[(nullable_col@0, asc)], dist_by=SinglePartition + SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false] + CoalescePartitionsExec + UnionExec + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], file_type=parquet + DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC], file_type=parquet + "); + Ok(()) +} + +async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl( + repartition_sorts: bool, +) -> Result { + let schema = create_test_schema()?; + + // Source 1, will be sorted explicitly (on `nullable_col`) + let source1 = parquet_exec(schema.clone()); + let ordering1 = [sort_expr("nullable_col", &schema)].into(); + let sort1 = sort_exec(ordering1, source1.clone()); + + // Source 2, pre-sorted (on `nullable_col`) + let parquet_ordering: LexOrdering = [sort_expr("nullable_col", &schema)].into(); + let source2 = parquet_exec_with_sort(schema.clone(), vec![parquet_ordering.clone()]); + + let union = union_exec(vec![sort1, source2]); + + let coalesced = coalesce_partitions_exec(union); + + // Required sorted / single partitioned output + let requirement = [PhysicalSortRequirement::new( + col("nullable_col", &schema)?, + Some(SortOptions::new(false, true)), + )] + .into(); + let physical_plan = Arc::new(OutputRequirementExec::new( + coalesced, + Some(OrderingRequirements::new(requirement)), + Distribution::SinglePartition, + None, + )); + + let test = + EnforceSortingTest::new(physical_plan).with_repartition_sorts(repartition_sorts); + Ok(test.run()) +} + #[tokio::test] async fn test_union_inputs_different_sorted3() -> Result<()> { let schema = create_test_schema()?;