xudong963 commented on code in PR #14207: URL: https://github.com/apache/datafusion/pull/14207#discussion_r1968751449
########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3154,3 +3164,104 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn apply_enforce_distribution_multiple_times() -> Result<()> { + // Create a configuration + let config = SessionConfig::new(); + let ctx = SessionContext::new_with_config(config); + let testdata = datafusion::test_util::arrow_test_data(); + let csv_file = format!("{testdata}/csv/aggregate_test_100.csv"); + // Create table schema and data + let sql = format!( + "CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) + STORED AS CSV + LOCATION '{csv_file}' + OPTIONS ('format.has_header' 'true')" + ); + + ctx.sql(sql.as_str()).await?; + + let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?; + let logical_plan = df.logical_plan().clone(); + let analyzed_logical_plan = ctx.state().analyzer().execute_and_check( + logical_plan, + ctx.state().config_options(), + |_, _| (), + )?; + let optimized_logical_plan = ctx.state().optimizer().optimize( + analyzed_logical_plan, + &ctx.state(), + |_, _| (), + )?; + + let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![ + Arc::new(OutputRequirements::new_add_mode()), + Arc::new(EnforceDistribution::new()), + Arc::new(EnforceSorting::new()), + Arc::new(ProjectionPushdown::new()), + Arc::new(CoalesceBatches::new()), + Arc::new(EnforceDistribution::new()), // -- Add enforce distribution rule again + // The second `EnforceDistribution` should be run before removing `OutputRequirements` to reproduce the bug. + Arc::new(OutputRequirements::new_remove_mode()), + Arc::new(ProjectionPushdown::new()), + Arc::new(LimitPushdown::new()), + Arc::new(SanityCheckPlan::new()), Review Comment: https://github.com/apache/datafusion/pull/14207/commits/ffb1eb3d3545ff367bbfac715cb005651f433848 done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org