alamb commented on code in PR #17900:
URL: https://github.com/apache/datafusion/pull/17900#discussion_r2402155131


##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -193,19 +289,28 @@ async fn test_remove_unnecessary_sort5() -> Result<()> {
     let join = hash_join_exec(left_input, right_input, on, None, 
&JoinType::Inner)?;
     let physical_plan = sort_exec([sort_expr("a", &join.schema())].into(), 
join);
 
+    let test = EnforceSortingTest {

Review Comment:
   You can see that the plans did not change by turning on whitespace blind 
diff: https://github.com/apache/datafusion/pull/17900/files?w=1



##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -91,6 +91,102 @@ fn csv_exec_sorted(
     DataSourceExec::from_data_source(config)
 }
 
+/// Runs the sort enforcement optimizer and asserts the plan
+/// against the original and expected plans
+struct EnforceSortingTest {
+    plan: Arc<dyn ExecutionPlan>,
+    repartition_sorts: bool,
+}
+
+impl EnforceSortingTest {
+    /// Runs the enforce sorting test and returns a string with the input and
+    /// optimized plan as strings for snapshot comparison using insta
+    fn run(&self) -> String {
+        let mut config = ConfigOptions::new();
+        config.optimizer.repartition_sorts = self.repartition_sorts;
+
+        // This file has 4 rules that use tree node, apply these rules as in 
the
+        // EnforceSorting::optimize implementation
+        // After these operations tree nodes should be in a consistent state.
+        // This code block makes sure that these rules doesn't violate tree 
node integrity.
+        {
+            let plan_requirements =
+                PlanWithCorrespondingSort::new_default(Arc::clone(&self.plan));
+            let adjusted = plan_requirements
+                .transform_up(ensure_sorting)
+                .data()
+                .and_then(check_integrity)
+                .expect("check_integrity failed after ensure_sorting");
+            // TODO: End state payloads will be checked here.
+
+            let new_plan = if config.optimizer.repartition_sorts {
+                let plan_with_coalesce_partitions =
+                    
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
+                let parallel = plan_with_coalesce_partitions
+                    .transform_up(parallelize_sorts)
+                    .data()
+                    .and_then(check_integrity)
+                    .expect("check_integrity failed after parallelize_sorts");
+                // TODO: End state payloads will be checked here.
+                parallel.plan
+            } else {
+                adjusted.plan
+            };
+
+            let plan_with_pipeline_fixer =
+                OrderPreservationContext::new_default(new_plan);
+            let updated_plan = plan_with_pipeline_fixer
+                .transform_up(|plan_with_pipeline_fixer| {
+                    replace_with_order_preserving_variants(
+                        plan_with_pipeline_fixer,
+                        false,
+                        true,
+                        &config,
+                    )
+                })
+                .data()
+                .and_then(check_integrity)
+                .expect(
+                    "check_integrity failed after 
replace_with_order_preserving_variants",
+                );
+            // TODO: End state payloads will be checked here.
+
+            let mut sort_pushdown = 
SortPushDown::new_default(updated_plan.plan);
+            assign_initial_requirements(&mut sort_pushdown);
+            check_integrity(
+                pushdown_sorts(sort_pushdown).expect("pushdown_sorts failed"),
+            )
+            .expect("check_integrity failed after pushdown_sorts");
+            // TODO: End state payloads will be checked here.
+        }
+        let input_plan_string = 
displayable(self.plan.as_ref()).indent(true).to_string();
+
+        // Run the actual optimizer
+        let optimized_physical_plan = EnforceSorting::new()
+            .optimize(Arc::clone(&self.plan), &config)
+            .expect("enforce_sorting failed");
+
+        // Get string representation of the plan
+        let optimized_plan_string = 
displayable(optimized_physical_plan.as_ref())
+            .indent(true)
+            .to_string();
+
+        let expected_input: Vec<&str> = 
input_plan_string.trim().lines().collect();
+        let expected_optimized: Vec<&str> 
=optimized_plan_string.trim().lines().collect();
+
+        // return a string with both input and optimized plan
+        format!(

Review Comment:
   This somewhat crazy formatting is designed to make it easier to review the 
diff on github



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to