Chungmin Lee created SPARK-51872:
------------------------------------

             Summary: AQE: CoalescePartitions causes BHJ performance regression 
via under-utilization
                 Key: SPARK-51872
                 URL: https://issues.apache.org/jira/browse/SPARK-51872
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 3.5.5
            Reporter: Chungmin Lee


When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When 
{{CoalesceShufflePartitions}} is applied to this shuffle, it can create a very 
small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} is 
applied, it uses the number of partitions decided by 
{{CoalesceShufflePartitions}} and although it can even out the partitions by 
coalescing mappers, the number of partitions doesn't change, which leads to 
large partitions and under-utilization.

Repro:
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure initial 
SMJ
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // 
Allow AQE BHJ{code}
{code:java}
drop table if exists t1;
create table t1 as select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, 
id, 100000) z from range(20000000);
select sum(hash(*)) from (
    select * from t1 r1 left outer join (
        select distinct z
        from t1
        where id <> 0
    ) r2
    on r1.z = r2.z
);
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to