2010YOUY01 commented on code in PR #15409: URL: https://github.com/apache/datafusion/pull/15409#discussion_r2018202146
########## datafusion/datasource/src/memory.rs: ########## @@ -440,6 +443,35 @@ impl DataSource for MemorySourceConfig { } } + fn repartitioned( Review Comment: I don't quite understand how `repartitioned()` should be implemented with `Some(...)` `output_ordering` (I think this is not documented yet, I'll update the comments afterward) Apparently, within each partition all entries should follow the specified ordering, but is there any cross-partition requirements? For example, should all entries in partition 2 required to be ordered after those in partition 1? ########## datafusion/datasource/src/memory.rs: ########## @@ -902,4 +1130,319 @@ mod tests { Ok(()) } + + fn batch(row_size: usize) -> RecordBatch { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size])); + let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size])); + RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() + } + + fn schema() -> SchemaRef { + batch(1).schema() + } + + fn memorysrcconfig_no_partitions( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_1_partition_1_batch( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_3_partitions_1_batch_each( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_3_partitions_with_2_batches_each( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![ + vec![batch(100), batch(100)], + vec![batch(100), batch(100)], + vec![batch(100), batch(100)], + ]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_1_partition_with_different_sized_batches( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_2_partition_with_different_sized_batches( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![ + vec![batch(100_000), batch(10_000), batch(1_000)], Review Comment: I recommend to include a test with size 1 and size 0 batches for extreme edge cases. ########## datafusion/datasource/src/memory.rs: ########## @@ -440,6 +443,35 @@ impl DataSource for MemorySourceConfig { } } + fn repartitioned( + &self, + target_partitions: usize, + _repartition_file_min_size: usize, + output_ordering: Option<LexOrdering>, + ) -> Result<Option<Arc<dyn DataSource>>> { + if self.partitions.is_empty() || self.partitions.len() >= target_partitions + // if already have more partitions than desired, do not merge + { + return Ok(None); + } + + let maybe_repartitioned = if let Some(output_ordering) = output_ordering { Review Comment: It would be great to briefly explain under what circumstance this repartition will fail (with returning `None`) ########## datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs: ########## @@ -520,7 +520,9 @@ async fn group_by_string_test( let expected = compute_counts(&input, column_name); let schema = input[0].schema(); - let session_config = SessionConfig::new().with_batch_size(50); + let session_config = SessionConfig::new() + .with_batch_size(50) + .with_repartition_file_scans(false); Review Comment: Probably a better idea is to use a separate config to control whether to repartition `MemTable`, instead of using on config option to control all of csv/parquet/memtable ########## datafusion/datasource/src/memory.rs: ########## @@ -902,4 +1130,319 @@ mod tests { Ok(()) } + + fn batch(row_size: usize) -> RecordBatch { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size])); + let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); row_size])); + let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size])); + RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap() + } + + fn schema() -> SchemaRef { + batch(1).schema() + } + + fn memorysrcconfig_no_partitions( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_1_partition_1_batch( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_3_partitions_1_batch_each( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100)], vec![batch(100)], vec![batch(100)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_3_partitions_with_2_batches_each( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![ + vec![batch(100), batch(100)], + vec![batch(100), batch(100)], + vec![batch(100), batch(100)], + ]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_1_partition_with_different_sized_batches( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), batch(1)]]; + MemorySourceConfig::try_new(&partitions, schema(), None)? + .try_with_sort_information(sort_information) + } + + fn memorysrcconfig_2_partition_with_different_sized_batches( + sort_information: Vec<LexOrdering>, + ) -> Result<MemorySourceConfig> { + let partitions = vec![ + vec![batch(100_000), batch(10_000), batch(1_000)], Review Comment: I recommend to include a test with size 1 and size 0 batches for extreme edge cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org