nuno-faria opened a new issue, #16638:
URL: https://github.com/apache/datafusion/issues/16638

   ### Describe the bug
   
   The `SortExec` physical operator is being incorrectly pushed down past a 
`LEFT ANTI JOIN`, returning incorrect results.
   Disabling the `EnforceSorting` physical optimizer rule results in a correct 
plan.
   
   ### To Reproduce
   
   ```rust
   use datafusion::error::Result;
   use datafusion::prelude::SessionContext;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let ctx = SessionContext::new();
   
       ctx.sql("create table t1 (k int, v int);")
           .await?
           .collect()
           .await?;
       ctx.sql("create table t2 (k int, v int);")
           .await?
           .collect()
           .await?;
   
       ctx.sql("insert into t1 select i as k, i as v from generate_series(1, 
10000) t(i);")
           .await?
           .collect()
           .await?;
       ctx.sql("insert into t2 values (1, 1);")
           .await?
           .collect()
           .await?;
   
       let df = ctx
           .sql("select * from t1 left anti join t2 on t1.k = t2.k order by 
t1.k limit 2;")
           .await?;
       df.clone().explain(false, false)?.show().await?;
       df.show().await?;
   
       Ok(())
   }
   ```
   
   Which returns:
   ```txt
   
+---------------+----------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                            |
   
+---------------+----------------------------------------------------------------------------------------+
   | logical_plan  | Sort: t1.k ASC NULLS LAST, fetch=2                         
                            |
   |               |   LeftAnti Join: t1.k = t2.k                               
                            |
   |               |     TableScan: t1 projection=[k, v]                        
                            |
   |               |     TableScan: t2 projection=[k]                           
                            |
   | physical_plan | SortPreservingMergeExec: [k@0 ASC NULLS LAST], fetch=2     
                            |
   |               |   CoalesceBatchesExec: target_batch_size=8192, fetch=2     
                            |
   |               |     HashJoinExec: mode=CollectLeft, join_type=RightAnti, 
on=[(k@0, k@0)]               |
   |               |       DataSourceExec: partitions=1, partition_sizes=[1]    
                            |
   |               |       SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], 
preserve_partitioning=[true] |
   |               |         RepartitionExec: partitioning=RoundRobinBatch(12), 
input_partitions=1          |
   |               |           DataSourceExec: partitions=1, 
partition_sizes=[2]                            |
   |               |                                                            
                            |
   
+---------------+----------------------------------------------------------------------------------------+
   +------+------+
   | k    | v    |
   +------+------+
   | 2    | 2    |
   | 8193 | 8193 |
   +------+------+
   
   ```
   
   After disabling the `EnforceSorting` rule:
   ```txt
   
+---------------+-----------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                       |
   
+---------------+-----------------------------------------------------------------------------------+
   | logical_plan  | Sort: t1.k ASC NULLS LAST, fetch=2                         
                       |
   |               |   LeftAnti Join: t1.k = t2.k                               
                       |
   |               |     TableScan: t1 projection=[k, v]                        
                       |
   |               |     TableScan: t2 projection=[k]                           
                       |
   | physical_plan | SortExec: TopK(fetch=2), expr=[k@0 ASC NULLS LAST], 
preserve_partitioning=[false] |
   |               |   CoalescePartitionsExec                                   
                       |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
                       |
   |               |       HashJoinExec: mode=CollectLeft, join_type=RightAnti, 
on=[(k@0, k@0)]        |
   |               |         DataSourceExec: partitions=1, partition_sizes=[1]  
                       |
   |               |         RepartitionExec: partitioning=RoundRobinBatch(12), 
input_partitions=1     |
   |               |           DataSourceExec: partitions=1, 
partition_sizes=[2]                       |
   |               |                                                            
                       |
   
+---------------+-----------------------------------------------------------------------------------+
   +---+---+
   | k | v |
   +---+---+
   | 2 | 2 |
   | 3 | 3 |
   +---+---+
   ```
   
   ### Expected behavior
   
   Not push down the sort in this case.
   
   ### Additional context
   
   - Tested from main (7cdac33dabbca2c06fac11cf6d2995dbc490c4bd)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to