berkaysynnada commented on code in PR #14821: URL: https://github.com/apache/datafusion/pull/14821#discussion_r1983211047
########## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ########## @@ -98,82 +98,112 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { - let sort_fetch = plan.fetch(); - let required_ordering = plan + let current_sort_fetch = plan.fetch(); + let parent_req_fetch = sort_push_down.data.fetch; + + let current_plan_reqs = plan .output_ordering() .cloned() .map(LexRequirement::from) .unwrap_or_default(); - if !satisfy_parent { - // Make sure this `SortExec` satisfies parent requirements: - let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - // It's possible current plan (`SortExec`) has a fetch value. - // And if both of them have fetch values, we should use the minimum one. - if let Some(fetch) = sort_fetch { - if let Some(requirement_fetch) = requirements.data.fetch { - requirements.data.fetch = Some(fetch.min(requirement_fetch)); - } + let parent_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&parent_reqs, ¤t_plan_reqs); + let child_is_stricter = plan Review Comment: child_is_stricter -> current_is_stricter ? Because when we say child, it sounds like child of the current ########## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ########## @@ -98,82 +98,112 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { - let sort_fetch = plan.fetch(); - let required_ordering = plan + let current_sort_fetch = plan.fetch(); + let parent_req_fetch = sort_push_down.data.fetch; + + let current_plan_reqs = plan .output_ordering() .cloned() .map(LexRequirement::from) .unwrap_or_default(); - if !satisfy_parent { - // Make sure this `SortExec` satisfies parent requirements: - let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - // It's possible current plan (`SortExec`) has a fetch value. - // And if both of them have fetch values, we should use the minimum one. - if let Some(fetch) = sort_fetch { - if let Some(requirement_fetch) = requirements.data.fetch { - requirements.data.fetch = Some(fetch.min(requirement_fetch)); - } + let parent_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&parent_reqs, ¤t_plan_reqs); + let child_is_stricter = plan + .equivalence_properties() + .requirements_compatible(¤t_plan_reqs, &parent_reqs); + + if !satisfy_parent && !parent_is_stricter { + // This new sort has different requirements than the ordering being pushed down. + // 1. add a `SortExec` here for the pushed down ordering (parent reqs). + // 2. continue sort pushdown, but with the new ordering of the new sort. + + // remove current sort (which will be the new ordering to pushdown) + let new_reqs = current_plan_reqs; + sort_push_down = sort_push_down.children.swap_remove(0); + sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan + + // add back sort exec matching parent + sort_push_down = + add_sort_above(sort_push_down, parent_reqs, parent_req_fetch); + + // If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort + // gets immdiately re-sorted. + // e.g. Sort col1 ASC + // Sort col1 DESC + // + // Remove this redundant sort by not pushing down. + let is_orthogonal_sort = Review Comment: I'm confused about this logic. This is inside of an `if` condition, which is `!satisfy_parent && !parent_is_stricter` evaluates to `true`. So, this must be equivalent to ``` let is_orthogonal_sort = !child_is_stricter; ``` However, don't we also know that if child is stricter, doesn't it satisfy parent? So, the overall statement will be `let is_orthogonal_sort = true;`? ########## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ########## @@ -98,82 +98,112 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { - let sort_fetch = plan.fetch(); - let required_ordering = plan + let current_sort_fetch = plan.fetch(); + let parent_req_fetch = sort_push_down.data.fetch; + + let current_plan_reqs = plan .output_ordering() .cloned() .map(LexRequirement::from) .unwrap_or_default(); - if !satisfy_parent { - // Make sure this `SortExec` satisfies parent requirements: - let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - // It's possible current plan (`SortExec`) has a fetch value. - // And if both of them have fetch values, we should use the minimum one. - if let Some(fetch) = sort_fetch { - if let Some(requirement_fetch) = requirements.data.fetch { - requirements.data.fetch = Some(fetch.min(requirement_fetch)); - } + let parent_is_stricter = plan + .equivalence_properties() + .requirements_compatible(&parent_reqs, ¤t_plan_reqs); + let child_is_stricter = plan + .equivalence_properties() + .requirements_compatible(¤t_plan_reqs, &parent_reqs); + + if !satisfy_parent && !parent_is_stricter { + // This new sort has different requirements than the ordering being pushed down. + // 1. add a `SortExec` here for the pushed down ordering (parent reqs). + // 2. continue sort pushdown, but with the new ordering of the new sort. + + // remove current sort (which will be the new ordering to pushdown) + let new_reqs = current_plan_reqs; + sort_push_down = sort_push_down.children.swap_remove(0); + sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan + + // add back sort exec matching parent + sort_push_down = + add_sort_above(sort_push_down, parent_reqs, parent_req_fetch); + + // If we have totally orthogonal sort, (2 different sorts in a row), that means the child sort + // gets immdiately re-sorted. + // e.g. Sort col1 ASC + // Sort col1 DESC + // + // Remove this redundant sort by not pushing down. + let is_orthogonal_sort = + !satisfy_parent && !parent_is_stricter && !child_is_stricter; + + // make pushdown requirements be the new ones. + if !is_orthogonal_sort || current_sort_fetch.is_some() { + sort_push_down.children[0].data = ParentRequirements { + ordering_requirement: Some(new_reqs), + fetch: current_sort_fetch, + }; } - let fetch = requirements.data.fetch.or(sort_fetch); - requirements = requirements.children.swap_remove(0); - requirements = add_sort_above(requirements, sort_reqs, fetch); - }; + } else { + // Don't add a SortExec + // Do update what sort requirements to keep pushing down - // We can safely get the 0th index as we are dealing with a `SortExec`. - let mut child = requirements.children.swap_remove(0); - if let Some(adjusted) = - pushdown_requirement_to_children(&child.plan, &required_ordering)? - { - let fetch = sort_fetch.or_else(|| child.plan.fetch()); - for (grand_child, order) in child.children.iter_mut().zip(adjusted) { - grand_child.data = ParentRequirements { - ordering_requirement: order, - fetch, - }; + // remove current sort, and get the sort's child + sort_push_down = sort_push_down.children.swap_remove(0); + sort_push_down = sort_push_down.update_plan_from_children()?; // changed plan + + // set the stricter fetch + sort_push_down.data.fetch = min_fetch(current_sort_fetch, parent_req_fetch); + + // set the stricter ordering + if child_is_stricter { + sort_push_down.data.ordering_requirement = Some(current_plan_reqs); + } else { + sort_push_down.data.ordering_requirement = Some(parent_reqs); } - // Can push down requirements - child.data = ParentRequirements { - ordering_requirement: Some(required_ordering), - fetch, - }; - return Ok(Transformed { - data: child, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }); - } else { - // Can not push down requirements - requirements.children = vec![child]; - assign_initial_requirements(&mut requirements); + // recursive call to helper, so it doesn't transform_down and miss the new node (previous child of sort) + return pushdown_sorts_helper(sort_push_down); } + } else if satisfy_parent && parent_reqs.is_empty() { Review Comment: if parent_reqs are empty, isn't satisfy_parent always true? ########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -2242,7 +2242,7 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 ASC NULLS LAST], file_type=parquet", ], expected_plan: vec![ - "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST, count@2 ASC NULLS LAST], preserve_partitioning=[false]", + "SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], preserve_partitioning=[false]", Review Comment: These changes are also going to cover the fix in sort pushdown:✅ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org