berkaysynnada commented on code in PR #15563:
URL: https://github.com/apache/datafusion/pull/15563#discussion_r2030660456


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -966,6 +966,8 @@ pub struct SortExec {
     preserve_partitioning: bool,
     /// Fetch highest/lowest n results
     fetch: Option<usize>,
+    /// Common sort prefix between the input and the sort expressions (only 
used with fetch)
+    common_sort_prefix: LexOrdering,

Review Comment:
   Here, can we just keep the common prefix expr count of
   ```rust
       /// Sort expressions
       expr: LexOrdering
   ```
   ? I think it'll be more simplified, and avoiding duplication



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1066,22 +1071,33 @@ impl SortExec {
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    /// It also returns the common sort prefix between the input and the sort 
expressions.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
         sort_exprs: LexOrdering,
         preserve_partitioning: bool,
-    ) -> PlanProperties {
+    ) -> (PlanProperties, LexOrdering) {
         // Determine execution mode:
         let requirement = LexRequirement::from(sort_exprs);
-        let sort_satisfied = input
+
+        let (sort_prefix, sort_satisfied) = input
             .equivalence_properties()
-            .ordering_satisfy_requirement(&requirement);
+            .extract_matching_prefix(&requirement);
+
+        let sort_partially_satisfied = sort_satisfied || 
!sort_prefix.is_empty();

Review Comment:
   Having a non-empty sort_prefix should be enough IMO, since there should be 
no possibility of having an empty `sort_exprs: LexOrdering` from the input 
arguments



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -183,6 +208,86 @@ impl TopK {
 
         // update memory reservation
         self.reservation.try_resize(self.size())?;
+
+        // flag the topK as finished if we know that all
+        // subsequent batches are guaranteed to be worse than the
+        // current topK
+        self.attempt_early_completion(&batch)?;

Review Comment:
   If you can somehow check this attempt before registering the batch,  we 
won't need to clone the batch



##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> {
     ) as Arc<dyn ExecutionPlan>;
 
     let expected_input = [
-        "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], 
preserve_partitioning=[false]",
+        "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], 
preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]",

Review Comment:
   If we just keep the common prefix count, it will simplify the displays too



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1066,22 +1071,33 @@ impl SortExec {
     }
 
     /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    /// It also returns the common sort prefix between the input and the sort 
expressions.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
         sort_exprs: LexOrdering,
         preserve_partitioning: bool,
-    ) -> PlanProperties {
+    ) -> (PlanProperties, LexOrdering) {
         // Determine execution mode:
         let requirement = LexRequirement::from(sort_exprs);
-        let sort_satisfied = input
+
+        let (sort_prefix, sort_satisfied) = input
             .equivalence_properties()
-            .ordering_satisfy_requirement(&requirement);
+            .extract_matching_prefix(&requirement);
+
+        let sort_partially_satisfied = sort_satisfied || 
!sort_prefix.is_empty();
 
         // The emission type depends on whether the input is already sorted:
-        // - If already sorted, we can emit results in the same way as the 
input
+        // - If already fully sorted, we can emit results in the same way as 
the input
+        // - If partially sorted, we might be able to emit results 
incrementally, but it is not guaranteed (Both)
         // - If not sorted, we must wait until all data is processed to emit 
results (Final)
         let emission_type = if sort_satisfied {
             input.pipeline_behavior()
+        } else if sort_partially_satisfied {
+            if input.pipeline_behavior() == EmissionType::Incremental {
+                EmissionType::Both

Review Comment:
   You can just say `input.pipeline_behavior()`, as Both means there is an 
internal accumulation within the operator, and those accumulated results are 
emitted after input exhaust. However the emission depends on the changes of 
common prefix values mainly, not the input termination.



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -90,15 +90,38 @@ pub struct TopK {
     scratch_rows: Rows,
     /// stores the top k values and their sort key values, in order
     heap: TopKHeap,
+    /// row converter, for common keys between the sort keys and the input 
ordering
+    common_prefix_converter: Option<RowConverter>,
+    /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
+    common_sort_prefix: Arc<[PhysicalSortExpr]>,

Review Comment:
   If we decide keeping the count only, this field simplifies as well



##########
datafusion/physical-expr/src/equivalence/properties/mod.rs:
##########
@@ -575,10 +576,35 @@ impl EquivalenceProperties {
             // From the analysis above, we know that `[a ASC]` is satisfied. 
Then,
             // we add column `a` as constant to the algorithm state. This 
enables us
             // to deduce that `(b + c) ASC` is satisfied, given `a` is 
constant.
-            eq_properties = eq_properties
-                
.with_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
+            eq_properties = eq_properties.with_constants(std::iter::once(
+                ConstExpr::from(Arc::clone(&normalized_req.expr)),
+            ));
         }
-        true
+
+        // All requirements are satisfied.
+        normalized_reqs.len()
+    }
+
+    /// Determines the longest prefix of `reqs` that is satisfied by the 
existing ordering.
+    /// Returns that prefix as a new `LexRequirement`, and a boolean 
indicating if all the requirements are satisfied.
+    pub fn extract_matching_prefix(
+        &self,
+        reqs: &LexRequirement,
+    ) -> (LexRequirement, bool) {

Review Comment:
   Again, we can just return the count of matched prefix length here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to