Kontinuation opened a new issue, #1589: URL: https://github.com/apache/datafusion-comet/issues/1589
### Describe the bug AQE could transform SortMergeJoin or ShuffledHashJoin to BroadcastHashJoin dynamically after discovering that one of the Exchange operator only shuffle writes small amount of data. However, this optimization does not always happen when using Comet. TPC-H Q7 has an equi-join between `supplier` and `lineitem`. Spark could discover that `supplier` is small enough to be broadcasted after running the `Exchange` operator, and dynamically change the sort-merge-join to a broadcast hash join (see `BroadcastHashJoin Inner BuildLeft (15)`): ``` == Physical Plan == AdaptiveSparkPlan (99) +- == Final Plan == * Sort (62) +- AQEShuffleRead (61) +- ShuffleQueryStage (60), Statistics(sizeInBytes=288.0 B, rowCount=4) +- Exchange (59) +- * HashAggregate (58) +- AQEShuffleRead (57) +- ShuffleQueryStage (56), Statistics(sizeInBytes=2.8 KiB, rowCount=36) +- Exchange (55) +- * HashAggregate (54) +- * Project (53) +- * BroadcastHashJoin Inner BuildRight (52) :- * Project (49) : +- * BroadcastHashJoin Inner BuildRight (48) : :- * Project (42) : : +- * SortMergeJoin Inner (41) : : :- * Sort (33) : : : +- AQEShuffleRead (32) : : : +- ShuffleQueryStage (31), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7) : : : +- Exchange (30) : : : +- * Project (29) : : : +- * SortMergeJoin Inner (28) : : : :- * Sort (20) : : : : +- AQEShuffleRead (19) : : : : +- ShuffleQueryStage (18), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7) : : : : +- Exchange (17) : : : : +- * Project (16) : : : : +- * BroadcastHashJoin Inner BuildLeft (15) <-- Transformed from SortMergeJoin by AQE : : : : :- BroadcastQueryStage (8), Statistics(sizeInBytes=8.0 MiB, rowCount=8.00E+4) : : : : : +- BroadcastExchange (7) : : : : : +- AQEShuffleRead (6) : : : : : +- ShuffleQueryStage (5), Statistics(sizeInBytes=1874.1 KiB, rowCount=8.00E+4) : : : : : +- Exchange (4) : : : : : +- * Filter (3) : : : : : +- * ColumnarToRow (2) : : : : : +- Scan parquet (1) : : : : +- AQEShuffleRead (14) : : : : +- ShuffleQueryStage (13), Statistics(sizeInBytes=8.1 GiB, rowCount=1.82E+8) : : : : +- Exchange (12) : : : : +- * Filter (11) : : : : +- * ColumnarToRow (10) : : : : +- Scan parquet (9) : : : +- * Sort (27) : : : +- AQEShuffleRead (26) : : : +- ShuffleQueryStage (25), Statistics(sizeInBytes=3.4 GiB, rowCount=1.50E+8) : : : +- Exchange (24) : : : +- * Filter (23) : : : +- * ColumnarToRow (22) : : : +- Scan parquet (21) : : +- * Sort (40) : : +- AQEShuffleRead (39) : : +- ShuffleQueryStage (38), Statistics(sizeInBytes=27.5 MiB, rowCount=1.20E+6) : : +- Exchange (37) : : +- * Filter (36) : : +- * ColumnarToRow (35) : : +- Scan parquet (34) : +- BroadcastQueryStage (47), Statistics(sizeInBytes=1024.0 KiB, rowCount=2) : +- BroadcastExchange (46) : +- * Filter (45) : +- * ColumnarToRow (44) : +- Scan parquet (43) +- BroadcastQueryStage (51), Statistics(sizeInBytes=1024.0 KiB, rowCount=2) +- ReusedExchange (50) ``` The following query plan is generated for running TPC-H Q7 with Comet enabled, The CometSortMergeJoin was not transformed to CometBroadcastHashJoin: ``` == Physical Plan == AdaptiveSparkPlan (95) +- == Final Plan == * CometColumnarToRow (58) +- CometSort (57) +- AQEShuffleRead (56) +- ShuffleQueryStage (55), Statistics(sizeInBytes=288.0 B, rowCount=4) +- CometColumnarExchange (54) +- CometHashAggregate (53) +- AQEShuffleRead (52) +- ShuffleQueryStage (51), Statistics(sizeInBytes=8.5 KiB, rowCount=72) +- CometExchange (50) +- CometHashAggregate (49) +- CometProject (48) +- CometBroadcastHashJoin (47) :- CometProject (44) : +- CometBroadcastHashJoin (43) : :- CometProject (38) : : +- CometSortMergeJoin (37) : : :- CometSort (30) : : : +- AQEShuffleRead (29) : : : +- ShuffleQueryStage (28), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7) : : : +- CometExchange (27) : : : +- CometProject (26) : : : +- CometSortMergeJoin (25) : : : :- CometSort (18) : : : : +- AQEShuffleRead (17) : : : : +- ShuffleQueryStage (16), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7) : : : : +- CometExchange (15) : : : : +- CometProject (14) : : : : +- CometSortMergeJoin (13) <-- Not transformed to CometBroadcastHashJoin : : : : :- CometSort (6) : : : : : +- AQEShuffleRead (5) : : : : : +- ShuffleQueryStage (4), Statistics(sizeInBytes=1281.0 KiB, rowCount=1.60E+5) : : : : : +- CometExchange (3) : : : : : +- CometFilter (2) : : : : : +- CometScan parquet (1) : : : : +- CometSort (12) : : : : +- AQEShuffleRead (11) : : : : +- ShuffleQueryStage (10), Statistics(sizeInBytes=8.9 GiB, rowCount=3.65E+8) : : : : +- CometExchange (9) : : : : +- CometFilter (8) : : : : +- CometScan parquet (7) : : : +- CometSort (24) : : : +- AQEShuffleRead (23) : : : +- ShuffleQueryStage (22), Statistics(sizeInBytes=2.2 GiB, rowCount=3.00E+8) : : : +- CometExchange (21) : : : +- CometFilter (20) : : : +- CometScan parquet (19) : : +- CometSort (36) : : +- AQEShuffleRead (35) : : +- ShuffleQueryStage (34), Statistics(sizeInBytes=18.7 MiB, rowCount=2.40E+6) : : +- CometExchange (33) : : +- CometFilter (32) : : +- CometScan parquet (31) : +- BroadcastQueryStage (42), Statistics(sizeInBytes=337.0 B, rowCount=2) : +- CometBroadcastExchange (41) : +- CometFilter (40) : +- CometScan parquet (39) +- BroadcastQueryStage (46), Statistics(sizeInBytes=337.0 B, rowCount=2) +- ReusedExchange (45) ``` ### Steps to reproduce Run TPC-H Q7 using TPC-H SF=100 dataset. The benchmarking code is in https://github.com/apache/datafusion-benchmarks/tree/main/tpch. ```bash spark-submit \ --master local[8] \ --conf spark.driver.memory=3g \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=16g \ --conf spark.jars=$COMET_JAR \ --conf spark.driver.extraClassPath=$COMET_JAR \ --conf spark.executor.extraClassPath=$COMET_JAR \ --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ --conf spark.comet.enabled=true \ --conf spark.comet.exec.shuffle.enabled=true \ --conf spark.comet.exec.shuffle.mode=auto \ --conf spark.comet.exec.shuffle.fallbackToColumnar=true \ --conf spark.comet.exec.shuffle.compression.codec=lz4 \ --conf spark.comet.exec.replaceSortMergeJoin=false \ tpcbench.py \ --benchmark tpch \ --data /path/to/tpch/sf100_parquet \ --queries ../../tpch/queries \ --output tpc-results \ --iterations 3 ``` ### Expected behavior The inner most CometSortMergeJoin gets transformed to CometBroadcastHashJoin. ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org