rluvaton commented on code in PR #22945:
URL: https://github.com/apache/datafusion/pull/22945#discussion_r3414786404
##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -481,3 +646,211 @@ impl RecordBatchStream for StreamAttachedReservation {
self.stream.schema()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use crate::expressions::PhysicalSortExpr;
+ use arrow::array::{AsArray, Int64Array};
+ use arrow::compute::concat_batches;
+ use arrow::datatypes::{DataType, Field, Int64Type, Schema};
+ use datafusion_execution::memory_pool::{
+ GreedyMemoryPool, MemoryConsumer, MemoryPool,
+ };
+ use datafusion_execution::runtime_env::RuntimeEnv;
+ use datafusion_physical_expr::expressions::Column;
+ use datafusion_physical_expr_common::metrics::{
+ ExecutionPlanMetricsSet, SpillMetrics,
+ };
+
+ fn test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]))
+ }
+
+ fn build_spill_manager(env: &Arc<RuntimeEnv>, schema: &SchemaRef) ->
SpillManager {
+ SpillManager::new(
+ Arc::clone(env),
+ SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
+ Arc::clone(schema),
+ )
+ }
+
+ /// Spill `values` (which must already be sorted) as a single sorted run
and
+ /// return it as a `SortedSpillFile` carrying its recorded largest-batch
memory.
+ fn make_sorted_spill_file(
+ spill_manager: &SpillManager,
+ schema: &SchemaRef,
+ values: Vec<i64>,
+ ) -> SortedSpillFile {
+ let batch = RecordBatch::try_new(
+ Arc::clone(schema),
+ vec![Arc::new(Int64Array::from(values))],
+ )
+ .unwrap();
+ let batches: Vec<Result<RecordBatch>> = vec![Ok(batch)];
+ let (file, max_record_batch_memory) = spill_manager
+ .spill_record_batch_iter_and_return_max_batch_memory(
+ batches.into_iter(),
+ "test input run",
+ )
+ .unwrap()
+ .expect("spill should produce a file");
+ SortedSpillFile {
+ file,
+ max_record_batch_memory,
+ }
+ }
+
+ fn build_merge_builder(
+ spill_manager: SpillManager,
+ schema: SchemaRef,
+ sorted_spill_files: Vec<SortedSpillFile>,
+ pool: &Arc<dyn MemoryPool>,
+ batch_size: usize,
+ ) -> MultiLevelMergeBuilder {
+ let reservation = MemoryConsumer::new("test merge").register(pool);
+ let expr: LexOrdering =
+ [PhysicalSortExpr::new_default(Arc::new(Column::new("x",
0)))].into();
+ MultiLevelMergeBuilder::new(
+ spill_manager,
+ schema,
+ sorted_spill_files,
+ vec![],
+ expr,
+ BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
+ batch_size,
+ reservation,
+ None,
+ false,
+ )
+ }
+
+ /// Proves the fix: two sorted runs whose largest batches are too big to
both
+ /// be seated in the merge budget at once are re-spilled (halved) until
they
+ /// fit, and the merge then completes with fully sorted, complete output.
+ /// Before the fix this returned `ResourcesExhausted` instead of merging.
Review Comment:
No need to mention the "fix"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]