2010YOUY01 commented on code in PR #15700: URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191376010
########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream { sort_batch(&batch, &expr, None) })), ))); - for spill in self.spill_state.spills.drain(..) { - let stream = self.spill_state.spill_manager.read_spill_as_stream(spill)?; - streams.push(stream); - } + self.spill_state.is_stream_merging = true; self.input = StreamingMergeBuilder::new() .with_streams(streams) .with_schema(schema) + .with_spill_manager(self.spill_state.spill_manager.clone()) + .with_sorted_spill_files(std::mem::take(&mut self.spill_state.spills)) Review Comment: I suggest to spill all in-memory batches (in `streams`) to disk, before this final merging step. Also, let the multi pass merge operator also only handle spill files, and don't have to handle in-mem batches and spills at the same time. This is just a simplification for now, we can do a optimization to avoid this re-spill step in the future. The issue is, without special handling, it's possible that in-mem batches will take most of the available memory budget, and leave only a very small memory part for multi-pass spilling to continue. This can cause slow downs or even prevent some cases to finish. We're already doing this in sort executor, see: https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org