zheniasigayev opened a new issue, #16919:
URL: https://github.com/apache/datafusion/issues/16919

   ### Describe the bug
   
   See discussion: Best practices for memory-efficient deduplication of 
pre-sorted Parquet files #16776 
   
   After investigating an optimal approach to perform deduplication on multiple 
pre-sorted Parquet files, it's believed that the following queries should be 
using the Streaming Aggregate operator, rather than buffering the entire 
dataset in a HashAggregateStream. This likely causes the out of memory error 
described in the discussion.
   
   ### To Reproduce
   
   ## Step 1) Generate Parquet files
   I created a script to produce equivalent parquet files. I confirmed that 
identical query plans, and memory consumers are seen when running the below 2 
queries against synthetic data.
   
   Create equivalent Parquet files using this Python script: 
https://gist.github.com/zheniasigayev/2e5e471c9070cfa685d938bced47aa7f
   
   ## Step 2) Run Reproducible Queries
   
   Run: `datafusion-cli -m 8G -d 50G --top-memory-consumers 25` with default 
settings.
   
   ### 2.1) Original Deduplication Query: 
   
   ```sql
   CREATE EXTERNAL TABLE example (
       col_1 VARCHAR(50) NOT NULL,
       col_2 BIGINT NOT NULL,
       col_3 VARCHAR(50),
       col_4 VARCHAR(50),
       col_5 VARCHAR(50),
       col_6 VARCHAR(100) NOT NULL,
       col_7 VARCHAR(50),
       col_8 DOUBLE
   ) 
   WITH ORDER (col_1 ASC, col_2 ASC) 
   STORED AS PARQUET 
   LOCATION '/tmp/redacted/*.parquet';
   
   EXPLAIN COPY (
       SELECT 
           col_1,
           col_2,
           col_3,
           col_4,
           col_5,
           col_6,
           first_value(col_7) AS col_7,
           first_value(col_8) AS col_8
       FROM 
           example 
       GROUP BY 
           col_1, col_2, col_3, col_4, col_5, col_6 
       ORDER BY 
           col_1 ASC, col_2 ASC
   ) 
   TO '/tmp/result.parquet' 
   STORED AS PARQUET 
   OPTIONS (compression 'zstd(1)');
   ```
   
   ### 2.2) Removing `first_value()` Aggregate and associated columns:
   
   It's not possible to only remove the `first_value()` aggregate from the 
above query since `col_7` and `col_8` won't appear in the `GROUP BY` clause.
   
   ```sql
   Error during planning: Column in SELECT must be in GROUP BY or an aggregate 
function: While expanding wildcard, column "example.col_7" must appear in the 
GROUP BY clause or must be part of an aggregate function, currently only 
"example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, 
example.col_6" appears in the SELECT clause satisfies this requirement
   ```
   
   Instead, I removed `col_7` and `col_8` (the columns which `first_value()` 
aggregate is applied to). This is the resulting query:
   
   ```sql
   CREATE EXTERNAL TABLE example (
       col_1 VARCHAR(50) NOT NULL,
       col_2 BIGINT NOT NULL,
       col_3 VARCHAR(50),
       col_4 VARCHAR(50),
       col_5 VARCHAR(50),
       col_6 VARCHAR(100) NOT NULL,
       col_7 VARCHAR(50),
       col_8 DOUBLE
   ) 
   WITH ORDER (col_1 ASC, col_2 ASC) 
   STORED AS PARQUET 
   LOCATION '/tmp/redacted/*.parquet';
   
   COPY (
       SELECT 
           col_1,
           col_2,
           col_3,
           col_4,
           col_5,
           col_6
       FROM 
           example 
       GROUP BY 
           col_1, col_2, col_3, col_4, col_5, col_6
       ORDER BY 
           col_1 ASC, col_2 ASC
   ) 
   TO '/tmp/result_part2.parquet' 
   STORED AS PARQUET 
   OPTIONS (compression 'zstd(1)');
   ```
    
   
   ### Expected behavior
   
   * Given that the data is sorted by `col_1` and `col_2`, its expected that 
the original query would use the streaming aggregate operator (which should not 
have much memory at all) 
([discussion](https://github.com/apache/datafusion/discussions/16776#discussioncomment-13777332)).
   
   
   ### Additional context
   
   ## Detailed Query Plans
   Generated using `EXPLAIN FORMAT INDENT ...`
   
   ### Original Deduplication Query:
   
   ```
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | CopyTo: format=parquet output_url=/tmp/result.parquet 
options: (format.compression zstd(1))                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                  |
   |               |   Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC 
NULLS LAST                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                |
   |               |     Projection: example.col_1, example.col_2, 
example.col_3, example.col_4, example.col_5, example.col_6, 
first_value(example.col_7) AS col_7, first_value(example.col_8) AS col_8        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              |
   |               |       Aggregate: groupBy=[[example.col_1, example.col_2, 
example.col_3, example.col_4, example.col_5, example.col_6]], 
aggr=[[first_value(example.col_7), first_value(example.col_8)]]                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
 |
   |               |         TableScan: example projection=[col_1, col_2, 
col_3, col_4, col_5, col_6, col_7, col_8]                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                   |
   | physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             |
   |               |   SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, 
col_2@1 ASC NULLS LAST]                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                    |
   |               |     SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC 
NULLS LAST], preserve_partitioning=[true]                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                |
   |               |       ProjectionExec: expr=[col_1@0 as col_1, col_2@1 as 
col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6, 
first_value(example.col_7)@6 as col_7, first_value(example.col_8)@7 as col_8]   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                |
   |               |         AggregateExec: mode=FinalPartitioned, gby=[col_1@0 
as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as 
col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), 
first_value(example.col_8)]                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
      |
   |               |           CoalesceBatchesExec: target_batch_size=8192      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             |
   |               |             RepartitionExec: partitioning=Hash([col_1@0, 
col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                               |
   |               |               AggregateExec: mode=Partial, gby=[col_1@0 as 
col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, 
col_6@5 as col_6], aggr=[first_value(example.col_7), 
first_value(example.col_8)]                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
         |
   |               |                 DataSourceExec: file_groups={10 groups: 
[[tmp/redacted/reproducible_data_0.parquet:0..12537303, 
tmp/redacted/reproducible_data_1.parquet:0..6245726], 
[tmp/redacted/reproducible_data_1.parquet:6245726..12518047, 
tmp/redacted/reproducible_data_10.parquet:0..12510708], 
[tmp/redacted/reproducible_data_10.parquet:12510708..12530931, 
tmp/redacted/reproducible_data_11.parquet:0..12518206, 
tmp/redacted/reproducible_data_12.parquet:0..6244600], 
[tmp/redacted/reproducible_data_12.parquet:6244600..12522074, 
tmp/redacted/reproducible_data_13.parquet:0..12505555], 
[tmp/redacted/reproducible_data_13.parquet:12505555..12523871, 
tmp/redacted/reproducible_data_14.parquet:0..12515635, 
tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, 
col_2, col_3, col_4, col_5, col_6, col_7, col_8], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   ### Removing `first_value()` Aggregate and associated columns:
   
   ```
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | CopyTo: format=parquet 
output_url=/tmp/result_part2.parquet options: (format.compression zstd(1))      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
 |
   |               |   Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC 
NULLS LAST                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |     Aggregate: groupBy=[[example.col_1, example.col_2, 
example.col_3, example.col_4, example.col_5, example.col_6]], aggr=[[]]         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |       TableScan: example projection=[col_1, col_2, col_3, 
col_4, col_5, col_6]                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |
   | physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |   SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, 
col_2@1 ASC NULLS LAST]                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    |
   |               |     SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC 
NULLS LAST], preserve_partitioning=[true]                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |       AggregateExec: mode=FinalPartitioned, gby=[col_1@0 
as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as 
col_5, col_6@5 as col_6], aggr=[]                                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    |
   |               |         CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   |               |           RepartitionExec: partitioning=Hash([col_1@0, 
col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |             AggregateExec: mode=Partial, gby=[col_1@0 as 
col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, 
col_6@5 as col_6], aggr=[]                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |               DataSourceExec: file_groups={10 groups: 
[[tmp/redacted/reproducible_data_0.parquet:0..12537303, 
tmp/redacted/reproducible_data_1.parquet:0..6245726], 
[tmp/redacted/reproducible_data_1.parquet:6245726..12518047, 
tmp/redacted/reproducible_data_10.parquet:0..12510708], 
[tmp/redacted/reproducible_data_10.parquet:12510708..12530931, 
tmp/redacted/reproducible_data_11.parquet:0..12518206, 
tmp/redacted/reproducible_data_12.parquet:0..6244600], 
[tmp/redacted/reproducible_data_12.parquet:6244600..12522074, 
tmp/redacted/reproducible_data_13.parquet:0..12505555], 
[tmp/redacted/reproducible_data_13.parquet:12505555..12523871, 
tmp/redacted/reproducible_data_14.parquet:0..12515635, 
tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, 
col_2, col_3, col_4, col_5, col_6], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to