2010YOUY01 commented on code in PR #15409:
URL: https://github.com/apache/datafusion/pull/15409#discussion_r2018202146


##########
datafusion/datasource/src/memory.rs:
##########
@@ -440,6 +443,35 @@ impl DataSource for MemorySourceConfig {
         }
     }
 
+    fn repartitioned(

Review Comment:
   I don't quite understand how `repartitioned()` should be implemented with 
`Some(...)` `output_ordering` (I think this is not documented yet, I'll update 
the comments afterward)
   Apparently, within each partition all entries should follow the specified 
ordering, but is there any cross-partition requirements? For example, should 
all entries in partition 2 required to be ordered after those in partition 1?



##########
datafusion/datasource/src/memory.rs:
##########
@@ -902,4 +1130,319 @@ mod tests {
 
         Ok(())
     }
+
+    fn batch(row_size: usize) -> RecordBatch {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); 
row_size]));
+        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
+        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+    }
+
+    fn schema() -> SchemaRef {
+        batch(1).schema()
+    }
+
+    fn memorysrcconfig_no_partitions(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_1_batch(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_1_batch_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)], vec![batch(100)], 
vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_with_2_batches_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+        ];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), 
batch(1)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_2_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100_000), batch(10_000), batch(1_000)],

Review Comment:
   I recommend to include a test with size 1 and size 0 batches for extreme 
edge cases.



##########
datafusion/datasource/src/memory.rs:
##########
@@ -440,6 +443,35 @@ impl DataSource for MemorySourceConfig {
         }
     }
 
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        _repartition_file_min_size: usize,
+        output_ordering: Option<LexOrdering>,
+    ) -> Result<Option<Arc<dyn DataSource>>> {
+        if self.partitions.is_empty() || self.partitions.len() >= 
target_partitions
+        // if already have more partitions than desired, do not merge
+        {
+            return Ok(None);
+        }
+
+        let maybe_repartitioned = if let Some(output_ordering) = 
output_ordering {

Review Comment:
   It would be great to briefly explain under what circumstance this 
repartition will fail (with returning `None`)



##########
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##########
@@ -520,7 +520,9 @@ async fn group_by_string_test(
     let expected = compute_counts(&input, column_name);
 
     let schema = input[0].schema();
-    let session_config = SessionConfig::new().with_batch_size(50);
+    let session_config = SessionConfig::new()
+        .with_batch_size(50)
+        .with_repartition_file_scans(false);

Review Comment:
   Probably a better idea is to use a separate config to control whether to 
repartition `MemTable`, instead of using on config option to control all of 
csv/parquet/memtable



##########
datafusion/datasource/src/memory.rs:
##########
@@ -902,4 +1130,319 @@ mod tests {
 
         Ok(())
     }
+
+    fn batch(row_size: usize) -> RecordBatch {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("foo"); 
row_size]));
+        let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![1; row_size]));
+        RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+    }
+
+    fn schema() -> SchemaRef {
+        batch(1).schema()
+    }
+
+    fn memorysrcconfig_no_partitions(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_1_batch(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_1_batch_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100)], vec![batch(100)], 
vec![batch(100)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_3_partitions_with_2_batches_each(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+            vec![batch(100), batch(100)],
+        ];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_1_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![vec![batch(100_000), batch(10_000), batch(100), 
batch(1)]];
+        MemorySourceConfig::try_new(&partitions, schema(), None)?
+            .try_with_sort_information(sort_information)
+    }
+
+    fn memorysrcconfig_2_partition_with_different_sized_batches(
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<MemorySourceConfig> {
+        let partitions = vec![
+            vec![batch(100_000), batch(10_000), batch(1_000)],

Review Comment:
   I recommend to include a test with size 1 and size 0 batches for extreme 
edge cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to