wiedld commented on code in PR #14821:
URL: https://github.com/apache/datafusion/pull/14821#discussion_r1985698343


##########
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##########
@@ -98,82 +98,112 @@ fn pushdown_sorts_helper(
         .ordering_satisfy_requirement(&parent_reqs);
 
     if is_sort(plan) {
-        let sort_fetch = plan.fetch();
-        let required_ordering = plan
+        let current_sort_fetch = plan.fetch();
+        let parent_req_fetch = sort_push_down.data.fetch;
+
+        let current_plan_reqs = plan
             .output_ordering()
             .cloned()
             .map(LexRequirement::from)
             .unwrap_or_default();
-        if !satisfy_parent {
-            // Make sure this `SortExec` satisfies parent requirements:
-            let sort_reqs = 
requirements.data.ordering_requirement.unwrap_or_default();
-            // It's possible current plan (`SortExec`) has a fetch value.
-            // And if both of them have fetch values, we should use the 
minimum one.
-            if let Some(fetch) = sort_fetch {
-                if let Some(requirement_fetch) = requirements.data.fetch {
-                    requirements.data.fetch = 
Some(fetch.min(requirement_fetch));
-                }
+        let parent_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&parent_reqs, &current_plan_reqs);
+        let child_is_stricter = plan
+            .equivalence_properties()
+            .requirements_compatible(&current_plan_reqs, &parent_reqs);
+
+        if !satisfy_parent && !parent_is_stricter {
+            // This new sort has different requirements than the ordering 
being pushed down.
+            // 1. add a `SortExec` here for the pushed down ordering (parent 
reqs).
+            // 2. continue sort pushdown, but with the new ordering of the new 
sort.
+
+            // remove current sort (which will be the new ordering to pushdown)
+            let new_reqs = current_plan_reqs;
+            sort_push_down = sort_push_down.children.swap_remove(0);
+            sort_push_down = sort_push_down.update_plan_from_children()?; // 
changed plan
+
+            // add back sort exec matching parent
+            sort_push_down =
+                add_sort_above(sort_push_down, parent_reqs, parent_req_fetch);
+
+            // If we have totally orthogonal sort, (2 different sorts in a 
row), that means the child sort
+            // gets immdiately re-sorted.
+            // e.g.  Sort col1 ASC
+            //         Sort col1 DESC
+            //
+            // Remove this redundant sort by not pushing down.
+            let is_orthogonal_sort =
+                !satisfy_parent && !parent_is_stricter && !child_is_stricter;
+
+            // make pushdown requirements be the new ones.
+            if !is_orthogonal_sort || current_sort_fetch.is_some() {
+                sort_push_down.children[0].data = ParentRequirements {
+                    ordering_requirement: Some(new_reqs),
+                    fetch: current_sort_fetch,
+                };
             }
-            let fetch = requirements.data.fetch.or(sort_fetch);
-            requirements = requirements.children.swap_remove(0);
-            requirements = add_sort_above(requirements, sort_reqs, fetch);
-        };
+        } else {
+            // Don't add a SortExec
+            // Do update what sort requirements to keep pushing down
 
-        // We can safely get the 0th index as we are dealing with a `SortExec`.
-        let mut child = requirements.children.swap_remove(0);
-        if let Some(adjusted) =
-            pushdown_requirement_to_children(&child.plan, &required_ordering)?
-        {
-            let fetch = sort_fetch.or_else(|| child.plan.fetch());
-            for (grand_child, order) in 
child.children.iter_mut().zip(adjusted) {
-                grand_child.data = ParentRequirements {
-                    ordering_requirement: order,
-                    fetch,
-                };
+            // remove current sort, and get the sort's child
+            sort_push_down = sort_push_down.children.swap_remove(0);
+            sort_push_down = sort_push_down.update_plan_from_children()?; // 
changed plan
+
+            // set the stricter fetch
+            sort_push_down.data.fetch = min_fetch(current_sort_fetch, 
parent_req_fetch);
+
+            // set the stricter ordering
+            if child_is_stricter {
+                sort_push_down.data.ordering_requirement = 
Some(current_plan_reqs);
+            } else {
+                sort_push_down.data.ordering_requirement = Some(parent_reqs);
             }
-            // Can push down requirements
-            child.data = ParentRequirements {
-                ordering_requirement: Some(required_ordering),
-                fetch,
-            };
 
-            return Ok(Transformed {
-                data: child,
-                transformed: true,
-                tnr: TreeNodeRecursion::Stop,
-            });
-        } else {
-            // Can not push down requirements
-            requirements.children = vec![child];
-            assign_initial_requirements(&mut requirements);
+            // recursive call to helper, so it doesn't transform_down and miss 
the new node (previous child of sort)
+            return pushdown_sorts_helper(sort_push_down);
         }
+    } else if satisfy_parent && parent_reqs.is_empty() {

Review Comment:
   Done. pushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to