haohuaijin commented on issue #16919: URL: https://github.com/apache/datafusion/issues/16919#issuecomment-3124127867
By default, datafusion will use the number of cpu cores to execute queries
concurrently.
For example, if the cpu core is 10 and the parquet files are 15, datafusion
will divide the 15 files into 10 groups and then execute them concurrently.
So for above issue, because there are 15 files but only 10 cpu cores,
datafusion groups the files, which results in the data in one group not being
sorted by `col_1` and `col_2`.
Before create table set the target_partitions equal to file number, will
generate the except plan.
For example
```sql
SET datafusion.execution.target_partitions = 15;
CREATE EXTERNAL TABLE example (
col_1 VARCHAR(50) NOT NULL,
col_2 BIGINT NOT NULL,
col_3 VARCHAR(50),
col_4 VARCHAR(50),
col_5 VARCHAR(50),
col_6 VARCHAR(100) NOT NULL,
col_7 VARCHAR(50),
col_8 DOUBLE
)
WITH ORDER (col_1 ASC, col_2 ASC)
STORED AS PARQUET
LOCATION '/tmp/redacted/*.parquet';
EXPLAIN FORMAT INDENT COPY (
SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6,
first_value(col_7) AS col_7,
first_value(col_8) AS col_8
FROM
example
GROUP BY
col_1, col_2, col_3, col_4, col_5, col_6
ORDER BY
col_1 ASC, col_2 ASC
)
TO '/tmp/result.parquet'
STORED AS PARQUET
OPTIONS (compression 'zstd(1)');
```
```
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | CopyTo: format=parquet output_url=/tmp/result.parquet
options: (format.compression zstd(1))
|
| | Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC
NULLS LAST
|
| | Projection: example.col_1, example.col_2,
example.col_3, example.col_4, example.col_5, example.col_6,
first_value(example.col_7) AS col_7, first_value(example.col_8) AS col_8
|
| | Aggregate: groupBy=[[example.col_1, example.col_2,
example.col_3, example.col_4, example.col_5, example.col_6]],
aggr=[[first_value(example.col_7), first_value(example.col_8)]]
|
| | TableScan: example projection=[col_1, col_2,
col_3, col_4, col_5, col_6, col_7, col_8]
|
| physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])
|
| | SortPreservingMergeExec: [col_1@0 ASC NULLS LAST,
col_2@1 ASC NULLS LAST]
|
| | ProjectionExec: expr=[col_1@0 as col_1, col_2@1 as
col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6,
first_value(example.col_7)@6 as col_7, first_value(example.col_8)@7 as col_8]
|
| | AggregateExec: mode=FinalPartitioned, gby=[col_1@0
as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as
col_5, col_6@5 as col_6], aggr=[first_value(example.col_7),
first_value(example.col_8)], ordering_mode=PartiallySorted([0, 1])
|
| | SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1
ASC NULLS LAST], preserve_partitioning=[true]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([col_1@0,
col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 15), input_partitions=15
|
| | AggregateExec: mode=Partial, gby=[col_1@0 as
col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5,
col_6@5 as col_6], aggr=[first_value(example.col_7),
first_value(example.col_8)], ordering_mode=PartiallySorted([0, 1])
|
| | DataSourceExec: file_groups={15 groups:
[[Users/huaijinhao/Downloads/datav4-15/reproducible_data_0.parquet],
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_1.parquet],
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_10.parquet],
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_11.parquet],
[Users/huaijinhao/Downloads/datav4-15/reproducible_data_12.parquet], ...]},
projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8],
output_ordering=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST],
file_type=parquet |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
another way is enable `split_file_groups_by_statistics` to make file in each
group not overlap on sort key. But the parquet file generated by python dot not
have min/max statistics, so i not able to verify this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
