xudong963 commented on code in PR #14207:
URL: https://github.com/apache/datafusion/pull/14207#discussion_r1959171412


##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+    // Create a configuration
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config(config);
+
+    // Create table schema and data
+    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT,
+        c5  INT,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT UNSIGNED NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+    OPTIONS ('format.has_header' 'true')";
+
+    ctx.sql(sql).await?;
+
+    let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL 
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+    let logical_plan = df.logical_plan().clone();
+    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+        logical_plan,
+        ctx.state().config_options(),
+        |_, _| (),
+    )?;
+
+    let optimized_logical_plan = ctx.state().optimizer().optimize(
+        analyzed_logical_plan,
+        &ctx.state(),
+        |_, _| (),
+    )?;
+
+    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![

Review Comment:
   The second `EnforceDistribution` should be run before removing 
`OutputRequirements`.
   
   You can try this one:
   ```rust
   let planner = DefaultPhysicalPlanner::default();
       let session_state = SessionStateBuilder::new()
           .with_config(ctx.copied_config())
           .with_default_features()
           
.with_physical_optimizer_rule(Arc::new(OutputRequirements::new_add_mode()))
           .with_physical_optimizer_rule(Arc::new(EnforceDistribution::new()))
           
.with_physical_optimizer_rule(Arc::new(OutputRequirements::new_remove_mode())) 
// -- Add enforce distribution rule again
           .build();
       let optimized_physical_plan = planner
           .create_physical_plan(&optimized_logical_plan, &session_state)
           .await?;
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to