alamb commented on code in PR #14821: URL: https://github.com/apache/datafusion/pull/14821#discussion_r1985673590
########## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ########## @@ -98,82 +98,112 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { - let sort_fetch = plan.fetch(); - let required_ordering = plan + let current_sort_fetch = plan.fetch(); + let parent_req_fetch = sort_push_down.data.fetch; + + let current_plan_reqs = plan .output_ordering() .cloned() .map(LexRequirement::from) .unwrap_or_default(); - if !satisfy_parent { - // Make sure this `SortExec` satisfies parent requirements: - let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - // It's possible current plan (`SortExec`) has a fetch value. - // And if both of them have fetch values, we should use the minimum one. - if let Some(fetch) = sort_fetch { - if let Some(requirement_fetch) = requirements.data.fetch { - requirements.data.fetch = Some(fetch.min(requirement_fetch)); - } + let parent_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&parent_reqs, ¤t_plan_reqs); + let child_is_stricter = plan + .equivalence_properties() + .requirements_compatible(¤t_plan_reqs, &parent_reqs); + + if !satisfy_parent && !parent_is_stricter { + // This new sort has different requirements than the ordering being pushed down. + // 1. add a `SortExec` here for the pushed down ordering (parent reqs). + // 2. continue sort pushdown, but with the new ordering of the new sort. + + // remove current sort (which will be the new ordering to pushdown) + let new_reqs = current_plan_reqs; + sort_push_down = sort_push_down.children.swap_remove(0); + sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan + + // add back sort exec matching parent + sort_push_down = + add_sort_above(sort_push_down, parent_reqs, parent_req_fetch); + + // If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort + // gets immdiately re-sorted. + // e.g. Sort col1 ASC + // Sort col1 DESC + // + // Remove this redundant sort by not pushing down. + let is_orthogonal_sort = + !satisfy_parent && !parent_is_stricter && !child_is_stricter; + + // make pushdown requirements be the new ones. + if !is_orthogonal_sort || current_sort_fetch.is_some() { + sort_push_down.children[0].data = ParentRequirements { + ordering_requirement: Some(new_reqs), + fetch: current_sort_fetch, + }; } - let fetch = requirements.data.fetch.or(sort_fetch); - requirements = requirements.children.swap_remove(0); - requirements = add_sort_above(requirements, sort_reqs, fetch); - }; + } else { + // Don't add a SortExec + // Do update what sort requirements to keep pushing down - // We can safely get the 0th index as we are dealing with a `SortExec`. - let mut child = requirements.children.swap_remove(0); - if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? - { - let fetch = sort_fetch.or_else(|| child.plan.fetch()); - for (grand_child, order) in child.children.iter_mut().zip(adjusted) { - grand_child.data = ParentRequirements { - ordering_requirement: order, - fetch, - }; + // remove current sort, and get the sort's child + sort_push_down = sort_push_down.children.swap_remove(0); + sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan + + // set the stricter fetch + sort_push_down.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch); + + // set the stricter ordering + if child_is_stricter { + sort_push_down.data.ordering_requirement = Some(current_plan_reqs); + } else { + sort_push_down.data.ordering_requirement = Some(parent_reqs); } - // Can push down requirements - child.data = ParentRequirements { - ordering_requirement: Some(required_ordering), - fetch, - }; - return Ok(Transformed { - data: child, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }); - } else { - // Can not push down requirements - requirements.children = vec![child]; - assign_initial_requirements(&mut requirements); + // recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort) + return pushdown_sorts_helper(sort_push_down); } + } else if satisfy_parent && parent_reqs.is_empty() { Review Comment: If the code isn't doing anything I think a comment would be more appropriate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org