xudong963 opened a new issue, #14150:
URL: https://github.com/apache/datafusion/issues/14150

   ### Describe the bug
   
   For a topk SQL: `select * from aggregate_test_100 ORDER BY c13 limit 5;`, If 
applied twice `EnforceDistribution`, will generate an invalid plan and result 
in the wrong result.
   
   The root reason is that the fetch of the limit will be missed at the second 
`EnforceDistribution`.
   
   ### To Reproduce
   
   Here is an example to reproduce 
   
   ```rust
   use std::sync::Arc;
   use futures::StreamExt;
   use datafusion::prelude::*;
   use datafusion::physical_optimizer::{
       coalesce_batches::CoalesceBatches,
       enforce_distribution::EnforceDistribution,
       output_requirements::OutputRequirements,
       PhysicalOptimizerRule,
   };
   use datafusion::error::Result;
   use datafusion::execution::SessionStateBuilder;
   use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
   use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
   use datafusion::physical_optimizer::projection_pushdown::ProjectionPushdown;
   use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
   use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
   
   #[tokio::main]
   async fn main() -> Result<()> {
       // Create a configuration
       let config = SessionConfig::new();
       let ctx = SessionContext::new_with_config(config);
   
       // Create table schema and data
       // To reproduce to bug: the LOCATION should contain more than one 
aggregate_test_100.csv
       let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
           c1  VARCHAR NOT NULL,
           c2  TINYINT NOT NULL,
           c3  SMALLINT NOT NULL,
           c4  SMALLINT,
           c5  INT,
           c6  BIGINT NOT NULL,
           c7  SMALLINT NOT NULL,
           c8  INT NOT NULL,
           c9  BIGINT UNSIGNED NOT NULL,
           c10 VARCHAR NOT NULL,
           c11 FLOAT NOT NULL,
           c12 DOUBLE NOT NULL,
           c13 VARCHAR NOT NULL
       )
       STORED AS CSV
       LOCATION './testing/data/csv/' 
       OPTIONS ('format.has_header' 'true')";
   
       ctx.sql(sql).await?;
   
       let df = ctx.sql("SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 
5").await?;
       let logical_plan = df.logical_plan().clone();
       let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
           logical_plan,
           ctx.state().config_options(),
           |_, _| (),
       )?;
   
       let optimized_logical_plan = ctx.state().optimizer().optimize(
           analyzed_logical_plan,
           &ctx.state(),
           |_, _| (),
       )?;
   
       let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
           // If there is a output requirement of the query, make sure that
           // this information is not lost across different rules during 
optimization.
           Arc::new(OutputRequirements::new_add_mode()),
           // The EnforceDistribution rule is for adding essential 
repartitioning to satisfy distribution
           // requirements. Please make sure that the whole plan tree is 
determined before this rule.
           // This rule increases parallelism if doing so is beneficial to the 
physical plan; i.e. at
           // least one of the operators in the plan benefits from increased 
parallelism.
           Arc::new(EnforceDistribution::new()),
           Arc::new(EnforceSorting::new()),
           // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule 
would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe 
optimize it in the future.
           Arc::new(ProjectionPushdown::new()),
           // The CoalesceBatches rule will not influence the distribution and 
ordering of the
           // whole plan tree. Therefore, to avoid influencing other rules, it 
should run last.
           Arc::new(CoalesceBatches::new()),
           Arc::new(EnforceDistribution::new()), // -- Add enforce distribution 
rule again
           // Remove the ancillary output requirement operator since we are 
done with the planning
           // phase.
           Arc::new(OutputRequirements::new_remove_mode()),
           Arc::new(ProjectionPushdown::new()),
           // The LimitPushdown rule tries to push limits down as far as 
possible,
           // replacing operators with fetching variants, or adding limits
           // past operators that support limit pushdown.
           Arc::new(LimitPushdown::new()),
           // The SanityCheckPlan rule checks whether the order and
           // distribution requirements of each node in the plan
           // is satisfied. It will also reject non-runnable query
           // plans that use pipeline-breaking operators on infinite
           // input(s). The rule generates a diagnostic error
           // message for invalid plans. It makes no changes to the
           // given query plan; i.e. it only acts as a final
           // gatekeeping rule.
           Arc::new(SanityCheckPlan::new()),
       ];
   
       // 2. Generate initial physical plan
       let planner = DefaultPhysicalPlanner::default();
       let session_state = SessionStateBuilder::new().
           with_config(ctx.copied_config()).with_default_features().
           with_physical_optimizer_rules(optimizers).build();
       let optimized_physical_plan = 
planner.create_physical_plan(&optimized_logical_plan, &session_state).await?;
   
       let mut results = optimized_physical_plan.execute(0, 
ctx.task_ctx().clone()).unwrap();
   
       let batch = results.next().await.unwrap()?;
       dbg!(batch.num_rows()); // 10 rows: unexpected result
       Ok(())
   }
   ```
   
   ### Expected behavior
   
   Generated a valid plan  and correct result as the doc said: 
https://github.com/apache/datafusion/blob/main/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L159-L168
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to