berkaysynnada commented on code in PR #14821:
URL: https://github.com/apache/datafusion/pull/14821#discussion_r1971059284


##########
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##########
@@ -98,66 +98,91 @@ fn pushdown_sorts_helper(
         .ordering_satisfy_requirement(&parent_reqs);
 
     if is_sort(plan) {
-        let sort_fetch = plan.fetch();
-        let required_ordering = plan
+        let current_sort_fetch = plan.fetch();
+        let parent_req_fetch = requirements.data.fetch;
+
+        let child_reqs = plan

Review Comment:
   Naming current plan's output ordering as child requirements is correct? 
Maybe we should call current_plan_reqs?



##########
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##########
@@ -98,82 +98,112 @@ fn pushdown_sorts_helper(
         .ordering_satisfy_requirement(&parent_reqs);
 
     if is_sort(plan) {
-        let sort_fetch = plan.fetch();
-        let required_ordering = plan
+        let current_sort_fetch = plan.fetch();
+        let parent_req_fetch = sort_push_down.data.fetch;
+
+        let child_reqs = plan
             .output_ordering()
             .cloned()
             .map(LexRequirement::from)
             .unwrap_or_default();
-        if !satisfy_parent {
-            // Make sure this `SortExec` satisfies parent requirements:
-            let sort_reqs = 
requirements.data.ordering_requirement.unwrap_or_default();
-            // It's possible current plan (`SortExec`) has a fetch value.
-            // And if both of them have fetch values, we should use the 
minimum one.
-            if let Some(fetch) = sort_fetch {
-                if let Some(requirement_fetch) = requirements.data.fetch {
-                    requirements.data.fetch = 
Some(fetch.min(requirement_fetch));
-                }
+        let parent_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&parent_reqs, &child_reqs);
+        let child_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&child_reqs, &parent_reqs);
+
+        if !satisfy_parent && !parent_is_stricter {
+            // This new sort has different requirements than the ordering 
being pushed down.
+            // 1. add a `SortExec` here for the pushed down ordering (parent 
reqs).
+            // 2. continue sort pushdown, but with the new ordering of the new 
sort.
+
+            // remove current sort (which will be the new ordering to pushdown)
+            let new_reqs = child_reqs;
+            sort_push_down = sort_push_down.children.swap_remove(0);
+
+            // add back sort exec matching parent
+            sort_push_down =
+                add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);
+
+            // If we have totally orthogonal sort, (2 different sorts in a 
row), that means the child sort
+            // gets immdiately re-sorted.
+            // e.g.  Sort col1 ASC
+            //         Sort col1 DESC
+            //
+            // Remove this redundant sort by not pushing down.
+            let is_orthogonal_sort =
+                !satisfy_parent && !parent_is_stricter && !child_is_stricter;
+
+            // make pushdown requirements be the new ones.
+            if !is_orthogonal_sort || current_sort_fetch.is_some() {
+                sort_push_down.children[0].data = ParentRequirements {
+                    ordering_requirement: Some(new_reqs),
+                    fetch: current_sort_fetch,
+                };
             }
-            let fetch = requirements.data.fetch.or(sort_fetch);
-            requirements = requirements.children.swap_remove(0);
-            requirements = add_sort_above(requirements, sort_reqs, fetch);
-        };
+        } else {
+            // Don't add a SortExec
+            // Do update what sort requirements to keep pushing down
 
-        // We can safely get the 0th index as we are dealing with a `SortExec`.
-        let mut child = requirements.children.swap_remove(0);
-        if let Some(adjusted) =
-            pushdown_requirement_to_children(&child.plan, &required_ordering)?
-        {
-            let fetch = sort_fetch.or_else(|| child.plan.fetch());
-            for (grand_child, order) in 
child.children.iter_mut().zip(adjusted) {
-                grand_child.data = ParentRequirements {
-                    ordering_requirement: order,
-                    fetch,
-                };
+            // remove current sort, and get the sort's child
+            sort_push_down = sort_push_down.children.swap_remove(0);
+            sort_push_down = sort_push_down.update_plan_from_children()?;
+
+            // set the stricter fetch
+            sort_push_down.data.fetch = min_fetch(current_sort_fetch, 
parent_req_fetch);
+
+            // set the stricter ordering
+            if child_is_stricter {
+                sort_push_down.data.ordering_requirement = Some(child_reqs);
+            } else {
+                sort_push_down.data.ordering_requirement = Some(parent_reqs);
             }
-            // Can push down requirements
-            child.data = ParentRequirements {
-                ordering_requirement: Some(required_ordering),
-                fetch,
-            };
 
-            return Ok(Transformed {
-                data: child,
-                transformed: true,
-                tnr: TreeNodeRecursion::Stop,
-            });
-        } else {
-            // Can not push down requirements
-            requirements.children = vec![child];
-            assign_initial_requirements(&mut requirements);
+            // recursive call to helper, so it doesn't transform_down and miss 
the new node (previous child of sort)
+            return pushdown_sorts_helper(sort_push_down);
         }
+    } else if satisfy_parent && parent_reqs.is_empty() {
+        // Nothing to do.
+        return Ok(Transformed::no(sort_push_down));
     } else if satisfy_parent {
-        // For non-sort operators, immediately return if parent requirements 
are met:
+        // For non-sort operators which satisfy ordering:
         let reqs = plan.required_input_ordering();
-        for (child, order) in requirements.children.iter_mut().zip(reqs) {
+        let parent_req_fetch = sort_push_down.data.fetch;
+
+        for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
             child.data.ordering_requirement = order;
+            child.data.fetch = min_fetch(parent_req_fetch, child.data.fetch);
         }
     } else if let Some(adjusted) = pushdown_requirement_to_children(plan, 
&parent_reqs)? {
-        // Can not satisfy the parent requirements, check whether we can push
-        // requirements down:
-        for (child, order) in requirements.children.iter_mut().zip(adjusted) {
+        // For operators that can take a sort pushdown.
+
+        // Continue pushdown, with updated requirements:
+        let parent_fetch = sort_push_down.data.fetch;
+        let current_fetch = plan.fetch();
+        for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) 
{
             child.data.ordering_requirement = order;
+            child.data.fetch = min_fetch(current_fetch, parent_fetch);
         }
-        requirements.data.ordering_requirement = None;
+        sort_push_down.data.ordering_requirement = None;
     } else {
         // Can not push down requirements, add new `SortExec`:
-        let sort_reqs = requirements
+        let sort_reqs = sort_push_down
             .data
             .ordering_requirement
             .clone()
             .unwrap_or_default();
-        let fetch = requirements.data.fetch;
-        requirements = add_sort_above(requirements, sort_reqs, fetch);
-        assign_initial_requirements(&mut requirements);
+        let fetch = sort_push_down.data.fetch;
+        sort_push_down = add_sort_above(sort_push_down, sort_reqs, fetch);
+        assign_initial_requirements(&mut sort_push_down);
     }
-    Ok(Transformed::yes(requirements))
+
+    sort_push_down = sort_push_down.update_plan_from_children()?;

Review Comment:
   Rather than doing this update here, isn't it better to apply it in the 
if-else blocks where it is only necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to