alamb commented on code in PR #14207:
URL: https://github.com/apache/datafusion/pull/14207#discussion_r1934089417


##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -932,12 +932,16 @@ fn add_hash_on_top(
 /// # Arguments
 ///
 /// * `input`: Current node.
+/// * `fetch`: Possible fetch value

Review Comment:
   I think it would help here to describe what is happening with the `fetch` 
argument  as well and when it is updated
   
   Maybe something like the followig
   
   ```suggestion
   /// * `fetch`: Possible fetch value. If a `SortPreservingMerge` is created
   ///   its fetch is set to this value and `fetch` is set to `None`
   ```



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3173,3 +3182,78 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+    // Create a configuration
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config(config);
+
+    // Create table schema and data
+    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT,
+        c5  INT,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT UNSIGNED NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+    OPTIONS ('format.has_header' 'true')";
+
+    ctx.sql(sql).await?;
+
+    let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL 
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+    let logical_plan = df.logical_plan().clone();
+    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+        logical_plan,
+        ctx.state().config_options(),
+        |_, _| (),
+    )?;
+
+    let optimized_logical_plan = ctx.state().optimizer().optimize(
+        analyzed_logical_plan,
+        &ctx.state(),
+        |_, _| (),
+    )?;
+
+    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+        Arc::new(OutputRequirements::new_add_mode()),
+        Arc::new(EnforceDistribution::new()),
+        Arc::new(EnforceSorting::new()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(CoalesceBatches::new()),
+        Arc::new(EnforceDistribution::new()), // -- Add enforce distribution 
rule again
+        Arc::new(OutputRequirements::new_remove_mode()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(LimitPushdown::new()),
+        Arc::new(SanityCheckPlan::new()),
+    ];
+
+    let planner = DefaultPhysicalPlanner::default();
+    let session_state = SessionStateBuilder::new()
+        .with_config(ctx.copied_config())
+        .with_default_features()
+        .with_physical_optimizer_rules(optimizers)
+        .build();
+    let optimized_physical_plan = planner
+        .create_physical_plan(&optimized_logical_plan, &session_state)
+        .await?;
+
+    let mut results = optimized_physical_plan
+        .execute(0, ctx.task_ctx().clone())
+        .unwrap();
+
+    let batch = results.next().await.unwrap()?;
+    // With the fix of https://github.com/apache/datafusion/pull/14207, the 
number of rows will be 10

Review Comment:
   ```suggestion
       // Without the fix of https://github.com/apache/datafusion/pull/14207, 
the number of rows will be 10
   ```



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1362,6 +1383,21 @@ pub fn ensure_distribution(
         plan.with_new_children(children_plans)?
     };
 
+    // If `fetch` was not consumed, it means that there was 
`SortPreservingMergeExec` with fetch before
+    // It was removed by `remove_dist_changing_operators`
+    // and we need to add it back.
+    if fetch.is_some() {
+        plan = Arc::new(
+            SortPreservingMergeExec::new(
+                plan.output_ordering()
+                    .unwrap_or(&LexOrdering::default())
+                    .clone(),
+                plan,
+            )
+            .with_fetch(fetch.take()),
+        )

Review Comment:
   Why does this add back a SortPreservingMerge without sort exprs? Wouldn't it 
be better to use a `GlobalLimitExec` or something?



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {

Review Comment:
   I think it is nice to have this "end to end" style test, but given the 
amount of code changed I think it is important to have more "unit style" tests 
otherwise it is hard to understand how general this fix is (or if it just works 
for the specified query)
   
   I wonder if you could construct some cases using the same framework as the 
tests above? Aka make a plan and then run EnforceDistribution twice on it and 
ensure the plans are ok?
   
   Or perhaps you can update the `assert_optimized!` to ensure that running 
`EnforceDistribution` twice doesn't change the plan again



##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1020,23 +1033,26 @@ fn remove_dist_changing_operators(
 /// ```
 fn replace_order_preserving_variants(
     mut context: DistributionContext,
-) -> Result<DistributionContext> {
-    context.children = context
-        .children
-        .into_iter()
-        .map(|child| {
-            if child.data {
-                replace_order_preserving_variants(child)
-            } else {
-                Ok(child)
-            }
-        })
-        .collect::<Result<Vec<_>>>()?;
+) -> Result<(DistributionContext, Option<usize>)> {
+    let mut children = vec![];
+    let mut fetch = None;
+    for child in context.children.into_iter() {

Review Comment:
   Since the `DistributionContext` is already passed through most of the 
functions in this code, I wonder if you considiered adding a `fetch` field, like
   
   ```rust
   struct DistributionContext {
     ...
     /// Limit which must be applied to any sort preserving merge that is 
created
     fetch: Option<usize>
   }
   ```
   
   🤔 



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+    // Create a configuration
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config(config);
+
+    // Create table schema and data
+    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT,
+        c5  INT,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT UNSIGNED NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+    OPTIONS ('format.has_header' 'true')";
+
+    ctx.sql(sql).await?;
+
+    let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL 
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+    let logical_plan = df.logical_plan().clone();
+    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+        logical_plan,
+        ctx.state().config_options(),
+        |_, _| (),
+    )?;
+
+    let optimized_logical_plan = ctx.state().optimizer().optimize(
+        analyzed_logical_plan,
+        &ctx.state(),
+        |_, _| (),
+    )?;
+
+    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![

Review Comment:
   I am somewhat  worried about this test being brittle -- it seems like it 
requires a very specific sequence of optimizer passes that are required. And I 
worry that if the default sequences of passes is changed then this test might 
no longer cover the issues
   
   I actually tried to reproduce the results by just adding 
`EnforceDistribution` at the end of the default list of optimizers and the 
issue did not manifest itself 🤔 
   
   ```rust
       let planner = DefaultPhysicalPlanner::default();
       let session_state = SessionStateBuilder::new()
           .with_config(ctx.copied_config())
           .with_default_features()
           
.with_physical_optimizer_rule(Arc::new(EnforceDistribution::new()))// -- Add 
enforce distribution rule again
           .build();
       let optimized_physical_plan = planner
           .create_physical_plan(&optimized_logical_plan, &session_state)
           .await?;
   ```



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+    // Create a configuration
+    let config = SessionConfig::new();
+    let ctx = SessionContext::new_with_config(config);
+
+    // Create table schema and data
+    let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT,
+        c5  INT,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  BIGINT UNSIGNED NOT NULL,
+        c10 VARCHAR NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+    STORED AS CSV
+    LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+    OPTIONS ('format.has_header' 'true')";
+
+    ctx.sql(sql).await?;
+
+    let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL 
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+    let logical_plan = df.logical_plan().clone();
+    let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+        logical_plan,
+        ctx.state().config_options(),
+        |_, _| (),
+    )?;
+
+    let optimized_logical_plan = ctx.state().optimizer().optimize(
+        analyzed_logical_plan,
+        &ctx.state(),
+        |_, _| (),
+    )?;
+
+    let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+        Arc::new(OutputRequirements::new_add_mode()),
+        Arc::new(EnforceDistribution::new()),
+        Arc::new(EnforceSorting::new()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(CoalesceBatches::new()),
+        Arc::new(EnforceDistribution::new()), // -- Add enforce distribution 
rule again
+        Arc::new(OutputRequirements::new_remove_mode()),
+        Arc::new(ProjectionPushdown::new()),
+        Arc::new(LimitPushdown::new()),
+        Arc::new(SanityCheckPlan::new()),
+    ];
+
+    let planner = DefaultPhysicalPlanner::default();
+    let session_state = SessionStateBuilder::new()
+        .with_config(ctx.copied_config())
+        .with_default_features()
+        .with_physical_optimizer_rules(optimizers)
+        .build();
+    let optimized_physical_plan = planner
+        .create_physical_plan(&optimized_logical_plan, &session_state)
+        .await?;
+
+    let mut results = optimized_physical_plan
+        .execute(0, ctx.task_ctx().clone())
+        .unwrap();
+
+    let batch = results.next().await.unwrap()?;

Review Comment:
   I verified that this test does fail without the code in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to