alamb opened a new issue, #11675: URL: https://github.com/apache/datafusion/issues/11675
### Describe the bug The `SanityChecker` added in https://github.com/apache/datafusion/pull/11196 from @mustafasrepo is triggering on some of our plans. I believe the SanityChecker is correctly rejecting a plan that doesn't satisfy the sort order requirements (aka it found a real bug) Specifically we have a plan like this ```text Extension node output is ┌────────────────────────────────────────────┐ ─ ─ ─ ─ resorted which is needed │ SortExec │ │ because the input sort order │ expr = [TIME ASC NULLS LAST] │ ◀ ─ ─ ─ is different └────────────────────────────────────────────┘ ▲ │ ┌────────────────────────────────────────────┐ │ <ExtensionNode>, │ │ required_input_order = [time ASC] │ │ preserves_input_order = true │ └────────────────────────────────────────────┘ ▲ │ ┌────────────────────────────────────────────┐ │ SortPreservingMerge │ │ expr = [TIME] │ └────────────────────────────────────────────┘ ``` But the PushdownSorts optimizer pushes the sort through the ExtensionNode, which violates the stated required input order: ```text this sort has been pushed ┌────────────────────────────────────────────┐ ─ ─ ─ ─ through the extension node, │ <ExtensionNode>, │ │ even though doing so │ required_input_order = [time ASC] │ violates the stated sort │ preserves_input_order = true │ │ requirements of └────────────────────────────────────────────┘ the extension node ▲ │ │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │ ┃ SortExec ┃ ┃ expr = [TIME ASC NULLS LAST] ┃ ◀─ ─ ─ ┘ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ▲ │ ┌────────────────────────────────────────────┐ │ SortPreservingMerge │ │ expr = [TIME] │ └────────────────────────────────────────────┘ ``` Here is what the actual plan looks like. This is the output of the enforce sorting pass. Note this is not the whole plan, this fragment actually feeds into a WindowExec node ``` SortExec: expr=[time@0 ASC NULLS LAST], <---- this is here because the node above it requires a different sort order GapFillExec: group_expr=[time@0], aggr_expr=[avg(diskio.writes)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, time_range=Included("123000000000")..Included("210000000000") SortExec: expr=[time@0 ASC], preserve_partitioning=[false] CoalescePartitionsExec AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[avg(diskio.writes)] RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, time@0, 0) as time], aggr=[avg(diskio.writes)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 FilterExec: time@0 >= 123000000000 AND time@0 <= 210000000000 AND writes@1 IS NOT NULL ParquetExec: file_groups={1 group: [[1/2/87ebd74b005047ba79ad38e5e3b2ae2ae0e0d722ccd6326d86947924755cd10f/92bd4a7f-2c80-4883-8bbe-f4efafd5cc68.parquet]]}, projection=[time, writes], predicate=time@2 >= 123000000000 A ``` However, sort_pushdown pushes the sort below the `GapFillExec` incorrectly because the pushed through sort doesn't satisfy the required input order. ``` GapFillExec: group_expr=[time@0], aggr_expr=[avg(diskio.writes)@1], stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, time_range=Included("123000000000")..Included("210000000000") SortExec: expr=[time@0 ASC NULLS LAST] <--- This was pushed through incorrectly SortPreservingMergeExec: [time@0 ASC] SortExec: expr=[time@0 ASC], preserve_partitioning=[true] AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[avg(diskio.writes)] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, time@0, 0) as time], aggr=[avg(diskio.writes)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 CoalesceBatchesExec: target_batch_size=8192 FilterExec: time@0 >= 123000000000 AND time@0 <= 210000000000 AND writes@1 IS NOT NULL ParquetExec: file_groups={1 group: [[1/2/87ebd74b005047ba79ad38e5e3b2ae2ae0e0d722ccd6326d86947924755cd10f/92bd4a7f-2c80-4883-8bbe-f4efafd5cc68.parquet]]}, projection=[time, writes], predicate=time@2 >= 123000000000 AND time@2 <= 210000000000 AND writes@3 IS NOT NULL, pruning_predicate=CASE WHEN time_null_count@1 = time_row_count@2 THEN false ELSE time_max@0 >= 123000000000 END AND CASE WHEN time_null_count@1 = time_row_count@2 THEN false ELSE time_min@3 <= 210000000000 END AND writes_null_count@5 != writes_row_count@4, required_guarantees=[] ``` ### To Reproduce I am still working on a DataFusion only reproducer. ### Expected behavior The expected behavior is that sorts are not pushed down when doing so would violate the `required_input_ordering` ### Additional context We found another issue https://github.com/apache/datafusion/issues/11492 I found if I changed gapfill to return 'false' for maintains input order the query passes ```rust fn maintains_input_order(&self) -> Vec<bool> { vec![false] } ``` However the node actually does preserve the input order so this is just a workaround -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
