Ángel Álvarez Pascua created SPARK-50992:
--------------------------------------------

             Summary: OOMs and performance issues with AQE in large plans
                 Key: SPARK-50992
                 URL: https://issues.apache.org/jira/browse/SPARK-50992
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.4, 3.5.3, 4.0.0
            Reporter: Ángel Álvarez Pascua


When AQE is enabled, Spark triggers update events to the internal listener bus 
whenever a plan changes. These events include a plain-text description of the 
plan, which is computationally expensive to generate for large plans.

*Key Issues:*
 # *High Cost of Plan String Calculation:*

 ** Generating the string description for large physical plans is a costly 
operation.
 ** This impacts performance, particularly in complex workflows with frequent 
plan updates (e.g. persisting DataFrames).
 # *Out-of-Memory (OOM) Errors:*

 ** Events are stored in the listener bus as {{SQLExecutionUIData}} objects and 
retained until a threshold is reached.
 ** This retention behavior can lead to memory exhaustion when processing large 
plans, causing OOM errors.
 # *Current Workarounds Are Ineffective:*

 ** *Reducing Retained Executions* ({{{}spark.sql.ui.retainedExecutions{}}}): 
Even when set to {{1}} or {{{}0{}}}, events are still created, requiring plan 
string calculations.
 ** *Limiting Plan String Length* ({{{}spark.sql.maxPlanStringLength{}}}): 
Reducing the maximum string length (e.g., to {{{}1,000,000{}}}) may mitigate 
OOMs but does not eliminate the overhead of string generation.
 ** *Available Explain Modes:* All existing explain modes are verbose and 
computationally expensive, failing to resolve these issues.

*Proposed Solution:*
Introduce a new explain mode, {*}{{off}}{*}, which suppresses the generation of 
plan string descriptions.
 * When this mode is enabled, Spark skips the calculation of plan descriptions 
altogether.
 * This resolves OOM errors and restores performance parity with non-AQE 
execution.

*Impact of Proposed Solution:*
 * Eliminates OOMs in large plans with AQE enabled.
 * Reduces the performance overhead associated with plan string generation.
 * Ensures Spark scales better in environments with large, complex plans.

*Reproducibility:*
The following test replicates the issue:

 
{code:java}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object 
Main {  

   def main(args: Array[String]): Unit = {
    // Create SparkSession
    val spark = SparkSession.builder()
      .master("local[*]")
      // Disabling AQE fixes it
      //.config("spark.sql.adaptive.enabled", "false")
      // Still costly and doesn't fix the OOM
      //.config("spark.sql.ui.retainedExecutions","1")
      //.config("spark.sql.ui.retainedExecutions","0")
      // Still costly, triggers lots of warnings and ... fixes it?
      // After 15min it still haven't got to the 23rd iteration
      //.config("spark.sql.maxPlanStringLength","1000000")
      // Currently available explain modes are costly and throw OOM
      //.config("spark.sql.ui.explainMode","simple")
      //.config("spark.sql.ui.explainMode","extended")
      //.config("spark.sql.ui.explainMode","codegen")
      //.config("spark.sql.ui.explainMode","cost")
      //.config("spark.sql.ui.explainMode","formatted")
      // Works perfectly fine and it takes the same time as with AQE disabled
      //.config("spark.sql.ui.explainMode","off")
      .getOrCreate()    

    import spark.implicits._    

    // Create an empty DataFrame with an initial schema
    var df = spark.emptyDataFrame
    println("Initial empty DataFrame created.")    

    // Start measuring execution time
    val startTime = System.currentTimeMillis()    

    // Loop to modify DataFrame
    for (i <- 1 to 25) {
      // Add a new column with null values
      df = df.withColumn(s"col$i", lit(null: String)).cache()      

      // Filter the DataFrame
      df = df.filter($"col$i".isNotNull)      

      // Explain the query plan
      //df.explain()      

      // Show the DataFrame
      df.show()
    }    

    // Print execution time
    val totalTime = System.currentTimeMillis() - startTime
    println(s"Total execution time: $totalTime ms")    

    // Stop the SparkSession
    spark.stop()
    println("SparkSession stopped.")
  }
}{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to