Hello, I have recently observed some performance issues when cross joining two small tables (the output is somewhat large). The problem is when both tables only have a single partition, join results in a single task on a single core (BroadcastNestedLoopJoin) which causes memory/run time issues. By simply repartitioning both dataframes before the join, we have been able to reduce the run times of some queries from 55 minutes to 5 minutes. Do you think it is a good idea to repartition both dataframes based on a heuristic (both dataframes are small, the output is not so small) during adaptive query execution (or catalyst)? Here is a snippet to demonstrate:
from pyspark.sql import SparkSession spark = ( SparkSession .builder .config("spark.sql.shuffle.partitions", 16) .getOrCreate() ) df1 = spark.range(10000) df2 = spark.range(20000) df1.repartition(1).write.parquet("df1", mode="overwrite") df2.repartition(1).write.parquet("df2", mode="overwrite") df1 = spark.read.parquet("df1").withColumnRenamed("id", "id1") df2 = spark.read.parquet("df2") # slow df1.crossJoin(df2).write.parquet("result", mode="overwrite") # 57s # better df1.repartition(16).crossJoin(df2.repartition(16)).write.parquet("result", mode="overwrite") # 16s