Rachelint commented on code in PR #22899:
URL: https://github.com/apache/datafusion/pull/22899#discussion_r3408943918


##########
datafusion/physical-plan/src/aggregates/hash_aggregate.rs:
##########
@@ -171,106 +318,382 @@ impl PartialHashAggregateStream {
         Ok(Self {
             schema,
             input,
-            hash_table,
             baseline_metrics,
             reservation,
             reduction_factor,
+            skip_aggregation_probe,
             group_values_soft_limit: agg.limit_options().map(|config| 
config.limit()),
+            state: Some(PartialHashAggregateState::ReadingInput { hash_table 
}),
         })
     }
 
     /// See comments in [`Self::group_values_soft_limit`] for details.
-    fn hit_soft_group_limit(&self) -> bool {
+    fn hit_soft_group_limit(&self, hash_table: &AggregateHashTable<Partial>) 
-> bool {
         self.group_values_soft_limit
-            .is_some_and(|limit| limit <= 
self.hash_table.building_group_count())
+            .is_some_and(|limit| limit <= hash_table.building_group_count())
     }
 
-    fn start_output(&mut self) -> Result<()> {
-        let input_schema = self.input.schema();
-        self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
-        self.hash_table.start_output()
+    /// Updates skip aggregation probe state.
+    fn update_skip_aggregation_probe(&mut self, input_rows: usize, num_groups: 
usize) {
+        if let Some(probe) = self.skip_aggregation_probe.as_mut() {
+            probe.update_state(input_rows, num_groups);
+        }
     }
-}
 
-impl Stream for PartialHashAggregateStream {
-    type Item = Result<RecordBatch>;
+    /// Returns true if the aggregation probe indicates that aggregation
+    /// should be skipped.
+    fn should_skip_aggregation(&self) -> bool {
+        self.skip_aggregation_probe
+            .as_ref()
+            .is_some_and(|probe| probe.should_skip())
+    }
 
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
+    fn start_output(
+        &mut self,
+        hash_table: &mut AggregateHashTable<Partial>,
+        close_input: bool,
+    ) -> Result<()> {
+        if close_input {
+            let input_schema = self.input.schema();
+            self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
+        }
+        hash_table.start_output()
+    }
+
+    /// Handle ReadingInput state - aggregate input batches into the hash 
table.
+    ///
+    /// Returns the next operator state with control flow decision.
+    fn handle_reading_input(

Review Comment:
   Seems many branches in this method, maybe we can add comments to each branch 
to make it more readable.
   We can make it in next prs.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to