[ https://issues.apache.org/jira/browse/SPARK-51872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chungmin Lee updated SPARK-51872: --------------------------------- Description: When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When {{CoalesceShufflePartitions}} is applied to this shuffle, it can create a very small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} is applied, it uses the number of partitions decided by {{CoalesceShufflePartitions}} and although it can even out the partitions by coalescing mappers, the number of partitions doesn't change, which leads to large partitions and under-utilization. Repro: {code:java} spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure initial SMJ spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // Allow AQE BHJ{code} {code:java} drop table if exists t1; create table t1 as select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, id, 100000) z from range(20000000); select sum(hash(*)) from ( select * from t1 r1 left outer join ( select distinct z from t1 where id <> 0 ) r2 on r1.z = r2.z ); {code} Good plan: !image-2025-04-22-20-12-14-032.png! was: When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When {{CoalesceShufflePartitions}} is applied to this shuffle, it can create a very small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} is applied, it uses the number of partitions decided by {{CoalesceShufflePartitions}} and although it can even out the partitions by coalescing mappers, the number of partitions doesn't change, which leads to large partitions and under-utilization. Repro: {code:java} spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure initial SMJ spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // Allow AQE BHJ{code} {code:java} drop table if exists t1; create table t1 as select id, uuid() u, id / 100 x, id % 100 y, if(id < 100000, id, 100000) z from range(20000000); select sum(hash(*)) from ( select * from t1 r1 left outer join ( select distinct z from t1 where id <> 0 ) r2 on r1.z = r2.z ); {code} > AQE: CoalescePartitions causes BHJ performance regression via > under-utilization > ------------------------------------------------------------------------------- > > Key: SPARK-51872 > URL: https://issues.apache.org/jira/browse/SPARK-51872 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.5.5 > Reporter: Chungmin Lee > Priority: Major > Attachments: image-2025-04-22-20-12-11-449.png, > image-2025-04-22-20-12-14-032.png > > > When AQE converts SMJ to BHJ, the probe side shuffle can have data skew. When > {{CoalesceShufflePartitions}} is applied to this shuffle, it can create a > very small number of partitions. Then when {{OptimizeShuffleWithLocalRead}} > is applied, it uses the number of partitions decided by > {{CoalesceShufflePartitions}} and although it can even out the partitions by > coalescing mappers, the number of partitions doesn't change, which leads to > large partitions and under-utilization. > Repro: > {code:java} > spark.conf.set("spark.sql.adaptive.enabled", "true") > spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") > spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true") > spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1MB") // Ensure > initial SMJ > spark.conf.set("spark.sql.adaptive.autoBroadcastJoinThreshold", "100MB") // > Allow AQE BHJ{code} > {code:java} > drop table if exists t1; > create table t1 as select id, uuid() u, id / 100 x, id % 100 y, if(id < > 100000, id, 100000) z from range(20000000); > select sum(hash(*)) from ( > select * from t1 r1 left outer join ( > select distinct z > from t1 > where id <> 0 > ) r2 > on r1.z = r2.z > ); > {code} > Good plan: > !image-2025-04-22-20-12-14-032.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org