alamb commented on code in PR #18254:
URL: https://github.com/apache/datafusion/pull/18254#discussion_r2560117640
##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -434,5 +452,55 @@ impl RecordBatchStream for RecursiveQueryStream {
}
}
+/// Deduplicator based on a hash table.
+struct DistinctDeduplicator {
+ /// Grouped rows used for distinct
+ group_values: Box<dyn GroupValues>,
+ reservation: MemoryReservation,
+ intern_output_buffer: Vec<usize>,
+}
+
+impl DistinctDeduplicator {
+ fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> {
+ let group_values = new_group_values(schema, &GroupOrdering::None)?;
+ let reservation = MemoryConsumer::new("RecursiveQueryHashTable")
+ .register(task_context.memory_pool());
+ Ok(Self {
+ group_values,
+ reservation,
+ intern_output_buffer: Vec::new(),
+ })
+ }
+
+ fn deduplicate(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
+ // We use the hash table to allocate new group ids.
+ // If they are new, i.e., if they have ids >= length before interning,
we keep them.
+ // We also detect duplicates by enforcing that group ids are
increasing.
+ let size_before = self.group_values.len();
+ self.intern_output_buffer.reserve(batch.num_rows());
+ self.group_values
+ .intern(batch.columns(), &mut self.intern_output_buffer)?;
+ let mask = are_increasing_mask(&self.intern_output_buffer,
size_before);
+ self.intern_output_buffer.clear();
+ // We update the reservation to reflect the new size of the hash table.
+ self.reservation.try_resize(self.group_values.size())?;
+ Ok(filter_record_batch(batch, &mask)?)
+ }
+}
+
+/// Return a mask, each element true if the value is greater than all previous
ones and greater or equal than the min_value
+fn are_increasing_mask(values: &[usize], mut min_value: usize) -> BooleanArray
{
Review Comment:
I also found this confusing. Some suggestions:
1. Rename the function to `new_groups_mask` to reflect what it does
2. Rename `min_value` to `max_seen_group_id` or `max_emitted` or something
like that.
##########
datafusion/physical-plan/src/recursive_query.rs:
##########
@@ -428,5 +446,58 @@ impl RecordBatchStream for RecursiveQueryStream {
}
}
+/// Deduplicator based on a hash table.
+struct DistinctDeduplicator {
+ /// Grouped rows used for distinct
+ group_values: Box<dyn GroupValues>,
+ reservation: MemoryReservation,
+ intern_output_buffer: Vec<usize>,
+}
+
+impl DistinctDeduplicator {
+ fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> {
+ let group_values = new_group_values(schema, &GroupOrdering::None)?;
+ let reservation = MemoryConsumer::new("RecursiveQueryHashTable")
+ .register(task_context.memory_pool());
+ Ok(Self {
+ group_values,
+ reservation,
+ intern_output_buffer: Vec::new(),
+ })
+ }
+
+ /// Remove duplicated rows from the given batch, keeping a state between
batches.
+ ///
+ /// We use a hash table to allocate new group ids for the new rows.
+ /// [`GroupValues`] allocate increasing group ids.
+ /// Hence, if groups (i.e., rows) are now, then they have ids >= length
before interning, we keep them.
Review Comment:
```suggestion
/// Hence, if groups (i.e., rows) are new, then they have ids >= length
before interning, we keep them.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]