gabotechs commented on code in PR #19853: URL: https://github.com/apache/datafusion/pull/19853#discussion_r2698749722
########## datafusion/sqllogictest/test_files/grouping_set_repartition.slt: ########## @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +# Tests for ROLLUP/CUBE/GROUPING SETS with multiple partitions +# +# This tests the fix for https://github.com/apache/datafusion/issues/19849 +# where ROLLUP queries produced incorrect results with multiple partitions +# because subset partitioning satisfaction was incorrectly applied. +# +# The bug manifests when: +# 1. UNION ALL of subqueries each with hash-partitioned aggregates +# 2. Outer ROLLUP groups by more columns than inner hash partitioning +# 3. InterleaveExec preserves the inner hash partitioning +# 4. Optimizer incorrectly uses subset satisfaction, skipping necessary repartition +# +# The fix ensures that when hash partitioning includes __grouping_id, +# subset satisfaction is disabled and proper RepartitionExec is inserted. +########## + +########## +# SETUP: Create partitioned parquet files to simulate distributed data +########## + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +set datafusion.optimizer.repartition_aggregations = true; + +# Create partition 1 +statement ok +COPY (SELECT column1 as channel, column2 as brand, column3 as amount FROM (VALUES + ('store', 'nike', 100), + ('store', 'nike', 200), + ('store', 'adidas', 150) +)) +TO 'test_files/scratch/grouping_set_repartition/part=1/data.parquet' +STORED AS PARQUET; + +# Create partition 2 +statement ok +COPY (SELECT column1 as channel, column2 as brand, column3 as amount FROM (VALUES + ('store', 'adidas', 250), + ('web', 'nike', 300), + ('web', 'nike', 400) +)) +TO 'test_files/scratch/grouping_set_repartition/part=2/data.parquet' +STORED AS PARQUET; + +# Create partition 3 +statement ok +COPY (SELECT column1 as channel, column2 as brand, column3 as amount FROM (VALUES + ('web', 'adidas', 350), + ('web', 'adidas', 450), + ('catalog', 'nike', 500) +)) +TO 'test_files/scratch/grouping_set_repartition/part=3/data.parquet' +STORED AS PARQUET; + +# Create partition 4 +statement ok +COPY (SELECT column1 as channel, column2 as brand, column3 as amount FROM (VALUES + ('catalog', 'nike', 600), + ('catalog', 'adidas', 550), + ('catalog', 'adidas', 650) +)) +TO 'test_files/scratch/grouping_set_repartition/part=4/data.parquet' +STORED AS PARQUET; + +# Create external table pointing to the partitioned data +statement ok +CREATE EXTERNAL TABLE sales (channel VARCHAR, brand VARCHAR, amount INT) +STORED AS PARQUET +PARTITIONED BY (part INT) +LOCATION 'test_files/scratch/grouping_set_repartition/'; + +########## +# TEST 1: UNION ALL + ROLLUP pattern (similar to TPC-DS q14) +# This query pattern triggers the subset satisfaction bug because: +# - Each UNION ALL branch has hash partitioning on (brand) +# - The outer ROLLUP requires hash partitioning on (channel, brand, __grouping_id) +# - Without the fix, subset satisfaction incorrectly skips repartition +# +# Verify the physical plan includes RepartitionExec with __grouping_id +########## + +query TT +EXPLAIN SELECT channel, brand, SUM(total) as grand_total +FROM ( + SELECT 'store' as channel, brand, SUM(amount) as total + FROM sales WHERE channel = 'store' + GROUP BY brand + UNION ALL + SELECT 'web' as channel, brand, SUM(amount) as total + FROM sales WHERE channel = 'web' + GROUP BY brand + UNION ALL + SELECT 'catalog' as channel, brand, SUM(amount) as total + FROM sales WHERE channel = 'catalog' + GROUP BY brand +) sub +GROUP BY ROLLUP(channel, brand) +ORDER BY channel NULLS FIRST, brand NULLS FIRST; +---- +logical_plan +01)Sort: sub.channel ASC NULLS FIRST, sub.brand ASC NULLS FIRST +02)--Projection: sub.channel, sub.brand, sum(sub.total) AS grand_total +03)----Aggregate: groupBy=[[ROLLUP (sub.channel, sub.brand)]], aggr=[[sum(sub.total)]] +04)------SubqueryAlias: sub +05)--------Union +06)----------Projection: Utf8("store") AS channel, sales.brand, sum(sales.amount) AS total +07)------------Aggregate: groupBy=[[sales.brand]], aggr=[[sum(CAST(sales.amount AS Int64))]] +08)--------------Projection: sales.brand, sales.amount +09)----------------Filter: sales.channel = Utf8View("store") +10)------------------TableScan: sales projection=[channel, brand, amount], partial_filters=[sales.channel = Utf8View("store")] +11)----------Projection: Utf8("web") AS channel, sales.brand, sum(sales.amount) AS total +12)------------Aggregate: groupBy=[[sales.brand]], aggr=[[sum(CAST(sales.amount AS Int64))]] +13)--------------Projection: sales.brand, sales.amount +14)----------------Filter: sales.channel = Utf8View("web") +15)------------------TableScan: sales projection=[channel, brand, amount], partial_filters=[sales.channel = Utf8View("web")] +16)----------Projection: Utf8("catalog") AS channel, sales.brand, sum(sales.amount) AS total +17)------------Aggregate: groupBy=[[sales.brand]], aggr=[[sum(CAST(sales.amount AS Int64))]] +18)--------------Projection: sales.brand, sales.amount +19)----------------Filter: sales.channel = Utf8View("catalog") +20)------------------TableScan: sales projection=[channel, brand, amount], partial_filters=[sales.channel = Utf8View("catalog")] +physical_plan +01)SortPreservingMergeExec: [channel@0 ASC, brand@1 ASC] +02)--SortExec: expr=[channel@0 ASC, brand@1 ASC], preserve_partitioning=[true] +03)----ProjectionExec: expr=[channel@0 as channel, brand@1 as brand, sum(sub.total)@3 as grand_total] +04)------AggregateExec: mode=FinalPartitioned, gby=[channel@0 as channel, brand@1 as brand, __grouping_id@2 as __grouping_id], aggr=[sum(sub.total)] +05)--------RepartitionExec: partitioning=Hash([channel@0, brand@1, __grouping_id@2], 4), input_partitions=4 +06)----------AggregateExec: mode=Partial, gby=[(NULL as channel, NULL as brand), (channel@0 as channel, NULL as brand), (channel@0 as channel, brand@1 as brand)], aggr=[sum(sub.total)] Review Comment: 👍 I imagine before this PR the RepartitionExec would not be there right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
