alamb opened a new issue, #11675:
URL: https://github.com/apache/datafusion/issues/11675

   ### Describe the bug
   
   The `SanityChecker` added in https://github.com/apache/datafusion/pull/11196 
from @mustafasrepo is triggering on some of our plans. I believe the 
SanityChecker is correctly rejecting a plan that doesn't satisfy the sort order 
requirements (aka it found a real bug)
   
   Specifically we have a plan like this
   
   ```text
                                                                    Extension 
node output is    
   ┌────────────────────────────────────────────┐          ─ ─ ─ ─  resorted 
which is needed    
   │                  SortExec                  │         │         because the 
input sort order
   │        expr = [TIME ASC NULLS LAST]        │  ◀ ─ ─ ─          is 
different                
   └────────────────────────────────────────────┘                               
                
                          ▲                                                     
                
                          │                                                     
                
   ┌────────────────────────────────────────────┐                               
                
   │              <ExtensionNode>,              │                               
                
   │     required_input_order = [time ASC]      │                               
                
   │        preserves_input_order = true        │                               
                
   └────────────────────────────────────────────┘                               
                
                          ▲                                                     
                
                          │                                                     
                
   ┌────────────────────────────────────────────┐                               
                
   │            SortPreservingMerge             │                               
                
   │               expr = [TIME]                │                               
                
   └────────────────────────────────────────────┘                               
                
   ```
   
   But the PushdownSorts optimizer pushes the sort through the ExtensionNode, 
which violates the stated required input order:
   
   ```text
                                                                    this sort 
has been pushed   
   ┌────────────────────────────────────────────┐          ─ ─ ─ ─  through the 
extension node, 
   │              <ExtensionNode>,              │         │         even though 
doing so        
   │     required_input_order = [time ASC]      │                   violates 
the stated sort    
   │        preserves_input_order = true        │         │         
requirements of             
   └────────────────────────────────────────────┘                   the 
extension node          
                          ▲                               │                     
                
                          │                                                     
                
   ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓         │                     
                
   ┃                  SortExec                  ┃                               
                
   ┃        expr = [TIME ASC NULLS LAST]        ┃  ◀─ ─ ─ ┘                     
                
   ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛                               
                
                          ▲                                                     
                
                          │                                                     
                
   ┌────────────────────────────────────────────┐                               
                
   │            SortPreservingMerge             │                               
                
   │               expr = [TIME]                │                               
                
   └────────────────────────────────────────────┘                               
                
   ```
   
   
   Here is what the actual plan looks like. 
   
   This is the output of the enforce sorting pass. Note this is not the whole 
plan, this fragment actually feeds into a WindowExec node
   ```
   SortExec: expr=[time@0 ASC NULLS LAST], <---- this is here because the node 
above it requires a different sort order
     GapFillExec: group_expr=[time@0], aggr_expr=[avg(diskio.writes)@1], 
stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, 
time_range=Included("123000000000")..Included("210000000000")
       SortExec: expr=[time@0 ASC], preserve_partitioning=[false]
         CoalescePartitionsExec
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], 
aggr=[avg(diskio.writes)]
             RepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16
               AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano 
{ months: 0, days: 0, nanoseconds: 7000000000 }, time@0, 0) as time], 
aggr=[avg(diskio.writes)]
                 RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
                   FilterExec: time@0 >= 123000000000 AND time@0 <= 
210000000000 AND writes@1 IS NOT NULL
                     ParquetExec: file_groups={1 group: 
[[1/2/87ebd74b005047ba79ad38e5e3b2ae2ae0e0d722ccd6326d86947924755cd10f/92bd4a7f-2c80-4883-8bbe-f4efafd5cc68.parquet]]},
 projection=[time, writes], predicate=time@2 >= 123000000000 A
   ```
   
   However, sort_pushdown pushes the sort below the `GapFillExec` incorrectly
   because the pushed through sort doesn't satisfy the required input order.
   
   ```
   GapFillExec: group_expr=[time@0], aggr_expr=[avg(diskio.writes)@1], 
stride=IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 7000000000 }, 
time_range=Included("123000000000")..Included("210000000000")
     SortExec: expr=[time@0 ASC NULLS LAST] <--- This was pushed through 
incorrectly
       SortPreservingMergeExec: [time@0 ASC]
         SortExec: expr=[time@0 ASC], preserve_partitioning=[true]
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], 
aggr=[avg(diskio.writes)]
             CoalesceBatchesExec: target_batch_size=8192
               RepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16
                 AggregateExec: mode=Partial, 
gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 
7000000000 }, time@0, 0) as time], aggr=[avg(diskio.writes)]
                   RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
                     CoalesceBatchesExec: target_batch_size=8192
                       FilterExec: time@0 >= 123000000000 AND time@0 <= 
210000000000 AND writes@1 IS NOT NULL
                         ParquetExec: file_groups={1 group: 
[[1/2/87ebd74b005047ba79ad38e5e3b2ae2ae0e0d722ccd6326d86947924755cd10f/92bd4a7f-2c80-4883-8bbe-f4efafd5cc68.parquet]]},
 projection=[time, writes], predicate=time@2 >= 123000000000 AND time@2 <= 
210000000000 AND writes@3 IS NOT NULL, pruning_predicate=CASE WHEN 
time_null_count@1 = time_row_count@2 THEN false ELSE time_max@0 >= 123000000000 
END AND CASE WHEN time_null_count@1 = time_row_count@2 THEN false ELSE 
time_min@3 <= 210000000000 END AND writes_null_count@5 != writes_row_count@4, 
required_guarantees=[]
   ```
   
   
   ### To Reproduce
   
   
   
   I am still working on a DataFusion only reproducer. 
   
   
   ### Expected behavior
   
   The expected behavior is that sorts are not pushed down when doing so would 
violate the `required_input_ordering`
   
   ### Additional context
   
   We found another issue https://github.com/apache/datafusion/issues/11492
   
   I found if I changed gapfill to return 'false' for maintains input order the 
query passes
   
   ```rust
       fn maintains_input_order(&self) -> Vec<bool> {
           vec![false]
       }
   ```
   However the node actually does preserve the input order so this is just a 
workaround
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to