zheniasigayev opened a new issue, #16919: URL: https://github.com/apache/datafusion/issues/16919
### Describe the bug See discussion: Best practices for memory-efficient deduplication of pre-sorted Parquet files #16776 After investigating an optimal approach to perform deduplication on multiple pre-sorted Parquet files, it's believed that the following queries should be using the Streaming Aggregate operator, rather than buffering the entire dataset in a HashAggregateStream. This likely causes the out of memory error described in the discussion. ### To Reproduce ## Step 1) Generate Parquet files I created a script to produce equivalent parquet files. I confirmed that identical query plans, and memory consumers are seen when running the below 2 queries against synthetic data. Create equivalent Parquet files using this Python script: https://gist.github.com/zheniasigayev/2e5e471c9070cfa685d938bced47aa7f ## Step 2) Run Reproducible Queries Run: `datafusion-cli -m 8G -d 50G --top-memory-consumers 25` with default settings. ### 2.1) Original Deduplication Query: ```sql CREATE EXTERNAL TABLE example ( col_1 VARCHAR(50) NOT NULL, col_2 BIGINT NOT NULL, col_3 VARCHAR(50), col_4 VARCHAR(50), col_5 VARCHAR(50), col_6 VARCHAR(100) NOT NULL, col_7 VARCHAR(50), col_8 DOUBLE ) WITH ORDER (col_1 ASC, col_2 ASC) STORED AS PARQUET LOCATION '/tmp/redacted/*.parquet'; EXPLAIN COPY ( SELECT col_1, col_2, col_3, col_4, col_5, col_6, first_value(col_7) AS col_7, first_value(col_8) AS col_8 FROM example GROUP BY col_1, col_2, col_3, col_4, col_5, col_6 ORDER BY col_1 ASC, col_2 ASC ) TO '/tmp/result.parquet' STORED AS PARQUET OPTIONS (compression 'zstd(1)'); ``` ### 2.2) Removing `first_value()` Aggregate and associated columns: It's not possible to only remove the `first_value()` aggregate from the above query since `col_7` and `col_8` won't appear in the `GROUP BY` clause. ```sql Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "example.col_7" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6" appears in the SELECT clause satisfies this requirement ``` Instead, I removed `col_7` and `col_8` (the columns which `first_value()` aggregate is applied to). This is the resulting query: ```sql CREATE EXTERNAL TABLE example ( col_1 VARCHAR(50) NOT NULL, col_2 BIGINT NOT NULL, col_3 VARCHAR(50), col_4 VARCHAR(50), col_5 VARCHAR(50), col_6 VARCHAR(100) NOT NULL, col_7 VARCHAR(50), col_8 DOUBLE ) WITH ORDER (col_1 ASC, col_2 ASC) STORED AS PARQUET LOCATION '/tmp/redacted/*.parquet'; COPY ( SELECT col_1, col_2, col_3, col_4, col_5, col_6 FROM example GROUP BY col_1, col_2, col_3, col_4, col_5, col_6 ORDER BY col_1 ASC, col_2 ASC ) TO '/tmp/result_part2.parquet' STORED AS PARQUET OPTIONS (compression 'zstd(1)'); ``` ### Expected behavior * Given that the data is sorted by `col_1` and `col_2`, its expected that the original query would use the streaming aggregate operator (which should not have much memory at all) ([discussion](https://github.com/apache/datafusion/discussions/16776#discussioncomment-13777332)). ### Additional context ## Detailed Query Plans Generated using `EXPLAIN FORMAT INDENT ...` ### Original Deduplication Query: ```| plan_type | plan || logical_plan | CopyTo: format=parquet output_url=/tmp/result.parquet options: (format.compression zstd(1)) | | | Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC NULLS LAST | | | Projection: example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6, first_value(example.col_7) AS col_7, first_value(example.col_8) AS col_8 | | | Aggregate: groupBy=[[example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6]], aggr=[[first_value(example.col_7), first_value(example.col_8)]] | | | TableScan: example projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8] | | physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[]) | | | SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST] | | | SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST], preserve_partitioning=[true] | | | ProjectionExec: expr=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6, first_value(example.col_7)@6 as col_7, first_value(example.col_8)@7 as col_8] | | | AggregateExec: mode=FinalPartitioned, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), first_value(example.col_8)] | | | CoalesceBatchesExec: target_batch_size=8192 | | | RepartitionExec: partitioning=Hash([col_1@0, col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10 | | | AggregateExec: mode=Partial, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), first_value(example.col_8)] | | | DataSourceExec: file_groups={10 groups: [[tmp/redacted/reproducible_data_0.parquet:0..12537303, tmp/redacted/reproducible_data_1.parquet:0..6245726], [tmp/redacted/reproducible_data_1.parquet:6245726..12518047, tmp/redacted/reproducible_data_10.parquet:0..12510708], [tmp/redacted/reproducible_data_10.parquet:12510708..12530931, tmp/redacted/reproducible_data_11.parquet:0..12518206, tmp/redacted/reproducible_data_12.parquet:0..6244600], [tmp/redacted/reproducible_data_12.parquet:6244600..12522074, tmp/redacted/reproducible_data_13.parquet:0..12505555], [tmp/redacted/reproducible_data_13.parquet:12505555..12523871, tmp/redacted/reproducible_data_14.parquet:0..12515635, tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8], file_type=parquet | | | |``` ### Removing `first_value()` Aggregate and associated columns: ```| plan_type | plan || logical_plan | CopyTo: format=parquet output_url=/tmp/result_part2.parquet options: (format.compression zstd(1)) | | | Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC NULLS LAST | | | Aggregate: groupBy=[[example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6]], aggr=[[]] | | | TableScan: example projection=[col_1, col_2, col_3, col_4, col_5, col_6] | | physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[]) | | | SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST] | | | SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST], preserve_partitioning=[true] | | | AggregateExec: mode=FinalPartitioned, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[] | | | CoalesceBatchesExec: target_batch_size=8192 | | | RepartitionExec: partitioning=Hash([col_1@0, col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10 | | | AggregateExec: mode=Partial, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[] | | | DataSourceExec: file_groups={10 groups: [[tmp/redacted/reproducible_data_0.parquet:0..12537303, tmp/redacted/reproducible_data_1.parquet:0..6245726], [tmp/redacted/reproducible_data_1.parquet:6245726..12518047, tmp/redacted/reproducible_data_10.parquet:0..12510708], [tmp/redacted/reproducible_data_10.parquet:12510708..12530931, tmp/redacted/reproducible_data_11.parquet:0..12518206, tmp/redacted/reproducible_data_12.parquet:0..6244600], [tmp/redacted/reproducible_data_12.parquet:6244600..12522074, tmp/redacted/reproducible_data_13.parquet:0..12505555], [tmp/redacted/reproducible_data_13.parquet:12505555..12523871, tmp/redacted/reproducible_data_14.parquet:0..12515635, tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, col_2, col_3, col_4, col_5, col_6], file_type=parquet | | | |``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
