Hello,

I have recently observed some performance issues when cross joining two
small tables (the output is somewhat large). The problem is when both
tables only have a single partition, join results in a single task on a
single core (BroadcastNestedLoopJoin) which causes memory/run time issues.
By simply repartitioning both dataframes before the join, we have been able
to reduce the run times of some queries from 55 minutes to 5 minutes. Do
you think it is a good idea to repartition both dataframes based on a
heuristic (both dataframes are small, the output is not so small) during
adaptive query execution (or catalyst)? Here is a snippet to demonstrate:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .config("spark.sql.shuffle.partitions", 16)
    .getOrCreate()
)
df1 = spark.range(10000)
df2 = spark.range(20000)
df1.repartition(1).write.parquet("df1", mode="overwrite")
df2.repartition(1).write.parquet("df2", mode="overwrite")

df1 = spark.read.parquet("df1").withColumnRenamed("id", "id1")
df2 = spark.read.parquet("df2")
# slow
df1.crossJoin(df2).write.parquet("result", mode="overwrite") # 57s
# better
df1.repartition(16).crossJoin(df2.repartition(16)).write.parquet("result",
mode="overwrite") # 16s

Reply via email to