berkaysynnada commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2030660456
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -966,6 +966,8 @@ pub struct SortExec { preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option<usize>, + /// Common sort prefix between the input and the sort expressions (only used with fetch) + common_sort_prefix: LexOrdering, Review Comment: Here, can we just keep the common prefix expr count of ```rust /// Sort expressions expr: LexOrdering ``` ? I think it'll be more simplified, and avoiding duplication ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1066,22 +1071,33 @@ impl SortExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// It also returns the common sort prefix between the input and the sort expressions. fn compute_properties( input: &Arc<dyn ExecutionPlan>, sort_exprs: LexOrdering, preserve_partitioning: bool, - ) -> PlanProperties { + ) -> (PlanProperties, LexOrdering) { // Determine execution mode: let requirement = LexRequirement::from(sort_exprs); - let sort_satisfied = input + + let (sort_prefix, sort_satisfied) = input .equivalence_properties() - .ordering_satisfy_requirement(&requirement); + .extract_matching_prefix(&requirement); + + let sort_partially_satisfied = sort_satisfied || !sort_prefix.is_empty(); Review Comment: Having a non-empty sort_prefix should be enough IMO, since there should be no possibility of having an empty `sort_exprs: LexOrdering` from the input arguments ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; Review Comment: If you can somehow check this attempt before registering the batch, we won't need to clone the batch ########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> { ) as Arc<dyn ExecutionPlan>; let expected_input = [ - "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]", Review Comment: If we just keep the common prefix count, it will simplify the displays too ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1066,22 +1071,33 @@ impl SortExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// It also returns the common sort prefix between the input and the sort expressions. fn compute_properties( input: &Arc<dyn ExecutionPlan>, sort_exprs: LexOrdering, preserve_partitioning: bool, - ) -> PlanProperties { + ) -> (PlanProperties, LexOrdering) { // Determine execution mode: let requirement = LexRequirement::from(sort_exprs); - let sort_satisfied = input + + let (sort_prefix, sort_satisfied) = input .equivalence_properties() - .ordering_satisfy_requirement(&requirement); + .extract_matching_prefix(&requirement); + + let sort_partially_satisfied = sort_satisfied || !sort_prefix.is_empty(); // The emission type depends on whether the input is already sorted: - // - If already sorted, we can emit results in the same way as the input + // - If already fully sorted, we can emit results in the same way as the input + // - If partially sorted, we might be able to emit results incrementally, but it is not guaranteed (Both) // - If not sorted, we must wait until all data is processed to emit results (Final) let emission_type = if sort_satisfied { input.pipeline_behavior() + } else if sort_partially_satisfied { + if input.pipeline_behavior() == EmissionType::Incremental { + EmissionType::Both Review Comment: You can just say `input.pipeline_behavior()`, as Both means there is an internal accumulation within the operator, and those accumulated results are emitted after input exhaust. However the emission depends on the changes of common prefix values mainly, not the input termination. ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -90,15 +90,38 @@ pub struct TopK { scratch_rows: Rows, /// stores the top k values and their sort key values, in order heap: TopKHeap, + /// row converter, for common keys between the sort keys and the input ordering + common_prefix_converter: Option<RowConverter>, + /// Common sort prefix between the input and the sort expressions to allow early exit optimization + common_sort_prefix: Arc<[PhysicalSortExpr]>, Review Comment: If we decide keeping the count only, this field simplifies as well ########## datafusion/physical-expr/src/equivalence/properties/mod.rs: ########## @@ -575,10 +576,35 @@ impl EquivalenceProperties { // From the analysis above, we know that `[a ASC]` is satisfied. Then, // we add column `a` as constant to the algorithm state. This enables us // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. - eq_properties = eq_properties - .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); + eq_properties = eq_properties.with_constants(std::iter::once( + ConstExpr::from(Arc::clone(&normalized_req.expr)), + )); } - true + + // All requirements are satisfied. + normalized_reqs.len() + } + + /// Determines the longest prefix of `reqs` that is satisfied by the existing ordering. + /// Returns that prefix as a new `LexRequirement`, and a boolean indicating if all the requirements are satisfied. + pub fn extract_matching_prefix( + &self, + reqs: &LexRequirement, + ) -> (LexRequirement, bool) { Review Comment: Again, we can just return the count of matched prefix length here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org