xudong963 commented on code in PR #14207:
URL: https://github.com/apache/datafusion/pull/14207#discussion_r1968751449


##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3154,3 +3164,104 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+    // Create a configuration
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config(config);
+    let testdata = datafusion::test_util::arrow_test_data();
+    let csv_file = format!("{testdata}/csv/aggregate_test_100.csv");
+    // Create table schema and data
+    let sql = format!(
+        "CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT,
+        c5  INT,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT UNSIGNED NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    LOCATION '{csv_file}'
+    OPTIONS ('format.has_header' 'true')"
+    );
+
+    ctx.sql(sql.as_str()).await?;
+
+    let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL 
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+    let logical_plan = df.logical_plan().clone();
+    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+        logical_plan,
+        ctx.state().config_options(),
+        |_, _| (),
+    )?;
+    let optimized_logical_plan = ctx.state().optimizer().optimize(
+        analyzed_logical_plan,
+        &ctx.state(),
+        |_, _| (),
+    )?;
+
+    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+        Arc::new(OutputRequirements::new_add_mode()),
+        Arc::new(EnforceDistribution::new()),
+        Arc::new(EnforceSorting::new()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(CoalesceBatches::new()),
+        Arc::new(EnforceDistribution::new()), // -- Add enforce distribution 
rule again
+        // The second `EnforceDistribution` should be run before removing 
`OutputRequirements` to reproduce the bug.
+        Arc::new(OutputRequirements::new_remove_mode()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(LimitPushdown::new()),
+        Arc::new(SanityCheckPlan::new()),

Review Comment:
   
https://github.com/apache/datafusion/pull/14207/commits/ffb1eb3d3545ff367bbfac715cb005651f433848
 done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to