Tpt commented on code in PR #18254:
URL: https://github.com/apache/datafusion/pull/18254#discussion_r2466079618


##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -268,8 +273,10 @@ struct RecursiveQueryStream {
     buffer: Vec<RecordBatch>,
     /// Tracks the memory used by the buffer
     reservation: MemoryReservation,
+    /// If the distinct flag is set, then we use this hash table to remove 
duplicates from result and work tables
+    distinct_deduplicator: Option<DistinctDeduplicator>,
     // /// Metrics.

Review Comment:
   Indeed. Thanks for spotting. Fixed.



##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -293,21 +304,28 @@ impl RecursiveQueryStream {
             schema,
             buffer: vec![],
             reservation,
-            _baseline_metrics: baseline_metrics,
-        }
+            distinct_deduplicator,
+            baseline_metrics,
+        })
     }
 
     /// Push a clone of the given batch to the in memory buffer, and then 
return
     /// a poll with it.
     fn push_batch(
         mut self: std::pin::Pin<&mut Self>,
-        batch: RecordBatch,
+        mut batch: RecordBatch,
     ) -> Poll<Option<Result<RecordBatch>>> {
+        let baseline_metrics = self.baseline_metrics.clone();
+        if let Some(deduplicator) = &mut self.distinct_deduplicator {

Review Comment:
   Thank you for prompting me on this! I removed the TODOs but have not moved 
the metric code to avoid duplicating it twice (once for the static stream and 
once for the recursive stream).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to