[ 
https://issues.apache.org/jira/browse/SPARK-51872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chungmin Lee updated SPARK-51872:
---------------------------------
    Description: 
When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When 
{{CoalesceShufflePartitions}} is applied to this shuffle, it can create a very 
small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} is 
applied, it uses the number of partitions decided by 
{{CoalesceShufflePartitions}} and although it can even out the partitions by 
coalescing mappers, the number of partitions doesn't change, which leads to 
large partitions and under-utilization.

Repro:
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure initial 
SMJ
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // 
Allow AQE BHJ{code}
{code:java}
drop table if exists t1;
create table t1 as
    select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, id, 100000) z
    from range(20000000);
select sum(hash(*)) from (
    select * from t1 r1 left outer join (
        select distinct z
        from t1
        where id <> 0
    ) r2
    on r1.z = r2.z
);
{code}
Good plan (spark.sql.adaptive.coalescePartitions.enabled = false)

!image-2025-04-22-20-12-14-032.png|width=720,height=5337!

Bad plan (spark.sql.adaptive.coalescePartitions.enabled = true):

!image-2025-04-22-20-13-43-569.png|width=720,height=3823!

  was:
When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When 
{{CoalesceShufflePartitions}} is applied to this shuffle, it can create a very 
small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} is 
applied, it uses the number of partitions decided by 
{{CoalesceShufflePartitions}} and although it can even out the partitions by 
coalescing mappers, the number of partitions doesn't change, which leads to 
large partitions and under-utilization.

Repro:
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure initial 
SMJ
spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // 
Allow AQE BHJ{code}
{code:java}
drop table if exists t1;
create table t1 as select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, 
id, 100000) z from range(20000000);
select sum(hash(*)) from (
    select * from t1 r1 left outer join (
        select distinct z
        from t1
        where id <> 0
    ) r2
    on r1.z = r2.z
);
{code}
Good plan (spark.sql.adaptive.coalescePartitions.enabled = false)

!image-2025-04-22-20-12-14-032.png|width=720,height=5337!

Bad plan (spark.sql.adaptive.coalescePartitions.enabled = true):

!image-2025-04-22-20-13-43-569.png|width=720,height=3823!


> AQE: CoalescePartitions causes BHJ performance regression via 
> under-utilization
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-51872
>                 URL: https://issues.apache.org/jira/browse/SPARK-51872
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.5.5
>            Reporter: Chungmin Lee
>            Priority: Major
>         Attachments: image-2025-04-22-20-12-14-032.png, 
> image-2025-04-22-20-13-43-569.png
>
>
> When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When 
> {{CoalesceShufflePartitions}} is applied to this shuffle, it can create a 
> very small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} 
> is applied, it uses the number of partitions decided by 
> {{CoalesceShufflePartitions}} and although it can even out the partitions by 
> coalescing mappers, the number of partitions doesn't change, which leads to 
> large partitions and under-utilization.
> Repro:
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
> spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure 
> initial SMJ
> spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // 
> Allow AQE BHJ{code}
> {code:java}
> drop table if exists t1;
> create table t1 as
>     select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, id, 100000) z
>     from range(20000000);
> select sum(hash(*)) from (
>     select * from t1 r1 left outer join (
>         select distinct z
>         from t1
>         where id <> 0
>     ) r2
>     on r1.z = r2.z
> );
> {code}
> Good plan (spark.sql.adaptive.coalescePartitions.enabled = false)
> !image-2025-04-22-20-12-14-032.png|width=720,height=5337!
> Bad plan (spark.sql.adaptive.coalescePartitions.enabled = true):
> !image-2025-04-22-20-13-43-569.png|width=720,height=3823!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to