Kontinuation opened a new issue, #1589:
URL: https://github.com/apache/datafusion-comet/issues/1589

   ### Describe the bug
   
   AQE could transform SortMergeJoin or ShuffledHashJoin to BroadcastHashJoin 
dynamically after discovering that one of the Exchange operator only shuffle 
writes small amount of data. However, this optimization does not always happen 
when using Comet.
   
   TPC-H Q7 has an equi-join between `supplier` and `lineitem`. Spark could 
discover that `supplier` is small enough to be broadcasted after running the 
`Exchange` operator, and dynamically change the sort-merge-join to a broadcast 
hash join (see `BroadcastHashJoin Inner BuildLeft (15)`):
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan (99)
   +- == Final Plan ==
      * Sort (62)
      +- AQEShuffleRead (61)
         +- ShuffleQueryStage (60), Statistics(sizeInBytes=288.0 B, rowCount=4)
            +- Exchange (59)
               +- * HashAggregate (58)
                  +- AQEShuffleRead (57)
                     +- ShuffleQueryStage (56), Statistics(sizeInBytes=2.8 KiB, 
rowCount=36)
                        +- Exchange (55)
                           +- * HashAggregate (54)
                              +- * Project (53)
                                 +- * BroadcastHashJoin Inner BuildRight (52)
                                    :- * Project (49)
                                    :  +- * BroadcastHashJoin Inner BuildRight 
(48)
                                    :     :- * Project (42)
                                    :     :  +- * SortMergeJoin Inner (41)
                                    :     :     :- * Sort (33)
                                    :     :     :  +- AQEShuffleRead (32)
                                    :     :     :     +- ShuffleQueryStage 
(31), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
                                    :     :     :        +- Exchange (30)
                                    :     :     :           +- * Project (29)
                                    :     :     :              +- * 
SortMergeJoin Inner (28)
                                    :     :     :                 :- * Sort (20)
                                    :     :     :                 :  +- 
AQEShuffleRead (19)
                                    :     :     :                 :     +- 
ShuffleQueryStage (18), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
                                    :     :     :                 :        +- 
Exchange (17)
                                    :     :     :                 :           
+- * Project (16)
                                    :     :     :                 :             
 +- * BroadcastHashJoin Inner BuildLeft (15)  <-- Transformed from 
SortMergeJoin by AQE
                                    :     :     :                 :             
    :- BroadcastQueryStage (8), Statistics(sizeInBytes=8.0 MiB, 
rowCount=8.00E+4)
                                    :     :     :                 :             
    :  +- BroadcastExchange (7)
                                    :     :     :                 :             
    :     +- AQEShuffleRead (6)
                                    :     :     :                 :             
    :        +- ShuffleQueryStage (5), Statistics(sizeInBytes=1874.1 KiB, 
rowCount=8.00E+4)
                                    :     :     :                 :             
    :           +- Exchange (4)
                                    :     :     :                 :             
    :              +- * Filter (3)
                                    :     :     :                 :             
    :                 +- * ColumnarToRow (2)
                                    :     :     :                 :             
    :                    +- Scan parquet  (1)
                                    :     :     :                 :             
    +- AQEShuffleRead (14)
                                    :     :     :                 :             
       +- ShuffleQueryStage (13), Statistics(sizeInBytes=8.1 GiB, 
rowCount=1.82E+8)
                                    :     :     :                 :             
          +- Exchange (12)
                                    :     :     :                 :             
             +- * Filter (11)
                                    :     :     :                 :             
                +- * ColumnarToRow (10)
                                    :     :     :                 :             
                   +- Scan parquet  (9)
                                    :     :     :                 +- * Sort (27)
                                    :     :     :                    +- 
AQEShuffleRead (26)
                                    :     :     :                       +- 
ShuffleQueryStage (25), Statistics(sizeInBytes=3.4 GiB, rowCount=1.50E+8)
                                    :     :     :                          +- 
Exchange (24)
                                    :     :     :                             
+- * Filter (23)
                                    :     :     :                               
 +- * ColumnarToRow (22)
                                    :     :     :                               
    +- Scan parquet  (21)
                                    :     :     +- * Sort (40)
                                    :     :        +- AQEShuffleRead (39)
                                    :     :           +- ShuffleQueryStage 
(38), Statistics(sizeInBytes=27.5 MiB, rowCount=1.20E+6)
                                    :     :              +- Exchange (37)
                                    :     :                 +- * Filter (36)
                                    :     :                    +- * 
ColumnarToRow (35)
                                    :     :                       +- Scan 
parquet  (34)
                                    :     +- BroadcastQueryStage (47), 
Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
                                    :        +- BroadcastExchange (46)
                                    :           +- * Filter (45)
                                    :              +- * ColumnarToRow (44)
                                    :                 +- Scan parquet  (43)
                                    +- BroadcastQueryStage (51), 
Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
                                       +- ReusedExchange (50)
   ```
   
   The following query plan is generated for running TPC-H Q7 with Comet 
enabled, The CometSortMergeJoin was not transformed to CometBroadcastHashJoin:
   
   ```
   == Physical Plan ==
   AdaptiveSparkPlan (95)
   +- == Final Plan ==
      * CometColumnarToRow (58)
      +- CometSort (57)
         +- AQEShuffleRead (56)
            +- ShuffleQueryStage (55), Statistics(sizeInBytes=288.0 B, 
rowCount=4)
               +- CometColumnarExchange (54)
                  +- CometHashAggregate (53)
                     +- AQEShuffleRead (52)
                        +- ShuffleQueryStage (51), Statistics(sizeInBytes=8.5 
KiB, rowCount=72)
                           +- CometExchange (50)
                              +- CometHashAggregate (49)
                                 +- CometProject (48)
                                    +- CometBroadcastHashJoin (47)
                                       :- CometProject (44)
                                       :  +- CometBroadcastHashJoin (43)
                                       :     :- CometProject (38)
                                       :     :  +- CometSortMergeJoin (37)
                                       :     :     :- CometSort (30)
                                       :     :     :  +- AQEShuffleRead (29)
                                       :     :     :     +- ShuffleQueryStage 
(28), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
                                       :     :     :        +- CometExchange 
(27)
                                       :     :     :           +- CometProject 
(26)
                                       :     :     :              +- 
CometSortMergeJoin (25)
                                       :     :     :                 :- 
CometSort (18)
                                       :     :     :                 :  +- 
AQEShuffleRead (17)
                                       :     :     :                 :     +- 
ShuffleQueryStage (16), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
                                       :     :     :                 :        
+- CometExchange (15)
                                       :     :     :                 :          
 +- CometProject (14)
                                       :     :     :                 :          
    +- CometSortMergeJoin (13)  <-- Not transformed to CometBroadcastHashJoin
                                       :     :     :                 :          
       :- CometSort (6)
                                       :     :     :                 :          
       :  +- AQEShuffleRead (5)
                                       :     :     :                 :          
       :     +- ShuffleQueryStage (4), Statistics(sizeInBytes=1281.0 KiB, 
rowCount=1.60E+5)
                                       :     :     :                 :          
       :        +- CometExchange (3)
                                       :     :     :                 :          
       :           +- CometFilter (2)
                                       :     :     :                 :          
       :              +- CometScan parquet  (1)
                                       :     :     :                 :          
       +- CometSort (12)
                                       :     :     :                 :          
          +- AQEShuffleRead (11)
                                       :     :     :                 :          
             +- ShuffleQueryStage (10), Statistics(sizeInBytes=8.9 GiB, 
rowCount=3.65E+8)
                                       :     :     :                 :          
                +- CometExchange (9)
                                       :     :     :                 :          
                   +- CometFilter (8)
                                       :     :     :                 :          
                      +- CometScan parquet  (7)
                                       :     :     :                 +- 
CometSort (24)
                                       :     :     :                    +- 
AQEShuffleRead (23)
                                       :     :     :                       +- 
ShuffleQueryStage (22), Statistics(sizeInBytes=2.2 GiB, rowCount=3.00E+8)
                                       :     :     :                          
+- CometExchange (21)
                                       :     :     :                            
 +- CometFilter (20)
                                       :     :     :                            
    +- CometScan parquet  (19)
                                       :     :     +- CometSort (36)
                                       :     :        +- AQEShuffleRead (35)
                                       :     :           +- ShuffleQueryStage 
(34), Statistics(sizeInBytes=18.7 MiB, rowCount=2.40E+6)
                                       :     :              +- CometExchange 
(33)
                                       :     :                 +- CometFilter 
(32)
                                       :     :                    +- CometScan 
parquet  (31)
                                       :     +- BroadcastQueryStage (42), 
Statistics(sizeInBytes=337.0 B, rowCount=2)
                                       :        +- CometBroadcastExchange (41)
                                       :           +- CometFilter (40)
                                       :              +- CometScan parquet  (39)
                                       +- BroadcastQueryStage (46), 
Statistics(sizeInBytes=337.0 B, rowCount=2)
                                          +- ReusedExchange (45)
   ```
   
   ### Steps to reproduce
   
   Run TPC-H Q7 using TPC-H SF=100 dataset. The benchmarking code is in 
https://github.com/apache/datafusion-benchmarks/tree/main/tpch.
   
   ```bash
   spark-submit \
     --master local[8] \
     --conf spark.driver.memory=3g \
     --conf spark.memory.offHeap.enabled=true \
     --conf spark.memory.offHeap.size=16g \
     --conf spark.jars=$COMET_JAR \
     --conf spark.driver.extraClassPath=$COMET_JAR \
     --conf spark.executor.extraClassPath=$COMET_JAR \
     --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
     --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
     --conf spark.comet.enabled=true \
     --conf spark.comet.exec.shuffle.enabled=true \
     --conf spark.comet.exec.shuffle.mode=auto \
     --conf spark.comet.exec.shuffle.fallbackToColumnar=true \
     --conf spark.comet.exec.shuffle.compression.codec=lz4 \
     --conf spark.comet.exec.replaceSortMergeJoin=false \
     tpcbench.py \
     --benchmark tpch \
     --data /path/to/tpch/sf100_parquet \
     --queries ../../tpch/queries \
     --output tpc-results \
     --iterations 3
   ```
   
   ### Expected behavior
   
   The inner most CometSortMergeJoin gets transformed to CometBroadcastHashJoin.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to