alamb commented on code in PR #14673:
URL: https://github.com/apache/datafusion/pull/14673#discussion_r1956781210


##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,

Review Comment:
   😍 



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+///   β”Œ ─ ─ ─ ─ ─ ┐                                                            
                       
+///    β”Œβ”€β”¬β”€β”¬β”€β”                                                                 
                       
+///   β”‚β”‚Bβ”‚Aβ”‚Dβ”‚... β”œβ”€β”€β”                                                         
                       
+///    β””β”€β”΄β”€β”΄β”€β”˜       β”‚                                                         
                       
+///   β”” ─ ─ ─ ─ ─ β”˜  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œ ─ ─ ─ ─ ─ ─ ┐   
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œ ─ ─ ─ ─ ─ ─ ─ ┐
+///    Partition 1   β”‚  β”‚        Coalesce        β”‚    β”Œβ”€β”¬β”€β”¬β”€β”¬β”€β”¬β”€β”      β”‚       
 β”‚     β”Œβ”€β”¬β”€β”¬β”€β”¬β”€β”¬β”€β”     
+///                  β”œβ”€β”€β–Ά(no ordering guarantees)│──▢││Bβ”‚Eβ”‚Aβ”‚Dβ”‚Cβ”‚...───▢  Sort 
 β”œβ”€β”€β”€β–Άβ”‚β”‚Aβ”‚Bβ”‚Cβ”‚Dβ”‚Eβ”‚... β”‚
+///                  β”‚  β”‚                        β”‚    β””β”€β”΄β”€β”΄β”€β”΄β”€β”΄β”€β”˜      β”‚       
 β”‚     β””β”€β”΄β”€β”΄β”€β”΄β”€β”΄β”€β”˜     
+///   β”Œ ─ ─ ─ ─ ─ ┐  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”” ─ ─ ─ ─ ─ ─ β”˜   
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”” ─ ─ ─ ─ ─ ─ ─ β”˜
+///    β”Œβ”€β”¬β”€β”         β”‚                                 Partition               
        Partition      
+///   β”‚β”‚Eβ”‚Cβ”‚ ...  β”œβ”€β”€β”˜                                                         
                       
+///    β””β”€β”΄β”€β”˜                                                                   
                       
+///   β”” ─ ─ ─ ─ ─ β”˜                                                            
                       
+///    Partition 2                                                             
                       
+///                                                                            
                      

Review Comment:
   ```suggestion
   ///                                                                          
                        
   /// ```
   ```



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+///   β”Œ ─ ─ ─ ─ ─ ┐                                                            
                       

Review Comment:
   If you add the backticks this will render more nicely in the output of 
`cargo doc`:
   
   ```suggestion
   /// ```text
   ///   β”Œ ─ ─ ─ ─ ─ ┐                                                          
                         
   ```



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,

Review Comment:
   This is greaet -- thank you @wiedld 
   
   I am not sure it will be rendered as a doc if it is listed on the `impl` -- 
perhaps it would be better here:
   
   
https://github.com/apache/datafusion/blob/a104661a020b895eb155af12575bafe693b8edaf/datafusion/physical-optimizer/src/enforce_sorting/mod.rs#L129-L128
   
   



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.
+///
+/// The [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// combine the partitions first, and then sort:
+///
+///   β”Œ ─ ─ ─ ─ ─ ┐                                                            
                       
+///    β”Œβ”€β”¬β”€β”¬β”€β”                                                                 
                       
+///   β”‚β”‚Bβ”‚Aβ”‚Dβ”‚... β”œβ”€β”€β”                                                         
                       
+///    β””β”€β”΄β”€β”΄β”€β”˜       β”‚                                                         
                       
+///   β”” ─ ─ ─ ─ ─ β”˜  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œ ─ ─ ─ ─ ─ ─ ┐   
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œ ─ ─ ─ ─ ─ ─ ─ ┐
+///    Partition 1   β”‚  β”‚        Coalesce        β”‚    β”Œβ”€β”¬β”€β”¬β”€β”¬β”€β”¬β”€β”      β”‚       
 β”‚     β”Œβ”€β”¬β”€β”¬β”€β”¬β”€β”¬β”€β”     
+///                  β”œβ”€β”€β–Ά(no ordering guarantees)│──▢││Bβ”‚Eβ”‚Aβ”‚Dβ”‚Cβ”‚...───▢  Sort 
 β”œβ”€β”€β”€β–Άβ”‚β”‚Aβ”‚Bβ”‚Cβ”‚Dβ”‚Eβ”‚... β”‚
+///                  β”‚  β”‚                        β”‚    β””β”€β”΄β”€β”΄β”€β”΄β”€β”΄β”€β”˜      β”‚       
 β”‚     β””β”€β”΄β”€β”΄β”€β”΄β”€β”΄β”€β”˜     
+///   β”Œ ─ ─ ─ ─ ─ ┐  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”” ─ ─ ─ ─ ─ ─ β”˜   
β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”” ─ ─ ─ ─ ─ ─ ─ β”˜
+///    β”Œβ”€β”¬β”€β”         β”‚                                 Partition               
        Partition      
+///   β”‚β”‚Eβ”‚Cβ”‚ ...  β”œβ”€β”€β”˜                                                         
                       
+///    β””β”€β”΄β”€β”˜                                                                   
                       
+///   β”” ─ ─ ─ ─ ─ β”˜                                                            
                       
+///    Partition 2                                                             
                       
+///                                                                            
                      
+///
+///
+/// The [`SortExec`] + [`SortPreservingMergeExec`] cascades
+/// sorts each partition first, then merge partitions while retaining the sort:
+///
+///   β”Œ ─ ─ ─ ─ ─ ┐   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œ ─ ─ ─ ─ ─ ┐                               
                  

Review Comment:
   ```suggestion
   /// ```
   ///   β”Œ ─ ─ ─ ─ ─ ┐   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œ ─ ─ ─ ─ ─ ┐                             
                    
   ```



##########
datafusion/physical-plan/src/tree_node.rs:
##########
@@ -39,8 +39,14 @@ impl DynTreeNode for dyn ExecutionPlan {
     }
 }
 
-/// A node object beneficial for writing optimizer rules, encapsulating an 
[`ExecutionPlan`] node with a payload.
-/// Since there are two ways to access child plansβ€”directly from the plan and 
through child nodesβ€”it's recommended
+/// A node context object beneficial for writing optimizer rules.
+/// This context encapsulating an [`ExecutionPlan`] node with a payload.
+///
+/// Since each wrapped node has it's children within both the 
[`PlanContext.plan.children()`],

Review Comment:
   πŸ‘ 
   
   this is indeed quite subtle -- it would be awesome if we could find some way 
to make it harder to forget to call `update_plan_from_children`



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -151,10 +165,51 @@ fn update_coalesce_ctx_children(
     };
 }
 
-/// The boolean flag `repartition_sorts` defined in the config indicates
-/// whether we elect to transform [`CoalescePartitionsExec`] + [`SortExec`] 
cascades
-/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades, which enables us 
to
-/// perform sorting in parallel.
+/// If `repartition_sorts` is enabled,
+/// then transform [`CoalescePartitionsExec`] + [`SortExec`] cascades
+/// into [`SortExec`] + [`SortPreservingMergeExec`] cascades.

Review Comment:
   I found cascades a bit confusing at first. Maybe something like this would 
be clearer:
   
   ```suggestion
   /// then transform [`CoalescePartitionsExec`] + [`SortExec`]
   /// into [`SortExec`] + [`SortPreservingMergeExec`] as illustrated below
   ```



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -84,42 +84,56 @@ impl EnforceSorting {
     }
 }
 
-/// This object is used within the [`EnforceSorting`] rule to track the closest
+/// This context object is used within the [`EnforceSorting`] rule to track 
the closest
 /// [`SortExec`] descendant(s) for every child of a plan. The data attribute
 /// stores whether the plan is a `SortExec` or is connected to a `SortExec`
 /// via its children.
 pub type PlanWithCorrespondingSort = PlanContext<bool>;
 
+/// For a given node, update the [`PlanContext.data`] attribute.
+///
+/// If the node is a `SortExec`, or any of the node's children are a 
`SortExec`,
+/// then set the attribute to true.
+///
+/// This requires a bottom-up traversal was previously performed, updating the
+/// children previously.
 fn update_sort_ctx_children(
-    mut node: PlanWithCorrespondingSort,
+    mut node_and_ctx: PlanWithCorrespondingSort,
     data: bool,
 ) -> Result<PlanWithCorrespondingSort> {
-    for child_node in node.children.iter_mut() {
-        let plan = &child_node.plan;
-        child_node.data = if is_sort(plan) {
-            // Initiate connection:
+    // Update `child.data` for all children.
+    for child_node in node_and_ctx.children.iter_mut() {
+        let child_plan = &child_node.plan;
+        child_node.data = if is_sort(child_plan) {
+            // child is sort
             true
-        } else if is_limit(plan) {
+        } else if is_limit(child_plan) {
             // There is no sort linkage for this path, it starts at a limit.
             false
         } else {
-            let is_spm = is_sort_preserving_merge(plan);
-            let required_orderings = plan.required_input_ordering();
-            let flags = plan.maintains_input_order();
+            // If a descendent is a sort, and the child maintains the sort.
+            let is_spm = is_sort_preserving_merge(child_plan);
+            let required_orderings = child_plan.required_input_ordering();
+            let flags = child_plan.maintains_input_order();
             // Add parent node to the tree if there is at least one child with
             // a sort connection:
             izip!(flags, required_orderings).any(|(maintains, 
required_ordering)| {
                 let propagates_ordering =
                     (maintains && required_ordering.is_none()) || is_spm;
+                // `connected_to_sort` only returns the correct answer with 
bottom-up traversal
                 let connected_to_sort =
                     child_node.children.iter().any(|child| child.data);
                 propagates_ordering && connected_to_sort
             })
         }
     }
 
-    node.data = data;
-    node.update_plan_from_children()
+    // set data attribute on current node
+    node_and_ctx.data = data;
+
+    // TODO(xudong963): the plans are not mutated, only the `data` attribute 
is set.
+    // Therefore this should be called before this function.
+    node_and_ctx.update_plan_from_children()

Review Comment:
   FYI maybe we could do something like  I suggest here
   - https://github.com/apache/datafusion/pull/14650#discussion_r1956787730
   
   To avoid having to remember to call `update_plan_from_children` as much



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to