[ 
https://issues.apache.org/jira/browse/SPARK-50992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ángel Álvarez Pascua updated SPARK-50992:
-----------------------------------------
    Attachment: Main.scala

> OOMs and performance issues with AQE in large plans
> ---------------------------------------------------
>
>                 Key: SPARK-50992
>                 URL: https://issues.apache.org/jira/browse/SPARK-50992
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0, 3.5.3, 3.5.4
>            Reporter: Ángel Álvarez Pascua
>            Priority: Major
>         Attachments: Main.scala
>
>
> When AQE is enabled, Spark triggers update events to the internal listener 
> bus whenever a plan changes. These events include a plain-text description of 
> the plan, which is computationally expensive to generate for large plans.
> *Key Issues:*
>  # *High Cost of Plan String Calculation:*
>  ** Generating the string description for large physical plans is a costly 
> operation.
>  ** This impacts performance, particularly in complex workflows with frequent 
> plan updates (e.g. persisting DataFrames).
>  # *Out-of-Memory (OOM) Errors:*
>  ** Events are stored in the listener bus as {{SQLExecutionUIData}} objects 
> and retained until a threshold is reached.
>  ** This retention behavior can lead to memory exhaustion when processing 
> large plans, causing OOM errors.
>  # *Current Workarounds Are Ineffective:*
>  ** *Reducing Retained Executions* ({{{}spark.sql.ui.retainedExecutions{}}}): 
> Even when set to {{1}} or {{{}0{}}}, events are still created, requiring plan 
> string calculations.
>  ** *Limiting Plan String Length* ({{{}spark.sql.maxPlanStringLength{}}}): 
> Reducing the maximum string length (e.g., to {{{}1,000,000{}}}) may mitigate 
> OOMs but does not eliminate the overhead of string generation.
>  ** *Available Explain Modes:* All existing explain modes are verbose and 
> computationally expensive, failing to resolve these issues.
> *Proposed Solution:*
> Introduce a new explain mode, {*}{{off}}{*}, which suppresses the generation 
> of plan string descriptions.
>  * When this mode is enabled, Spark skips the calculation of plan 
> descriptions altogether.
>  * This resolves OOM errors and restores performance parity with non-AQE 
> execution.
> *Impact of Proposed Solution:*
>  * Eliminates OOMs in large plans with AQE enabled.
>  * Reduces the performance overhead associated with plan string generation.
>  * Ensures Spark scales better in environments with large, complex plans.
> *Reproducibility:*
> The following test replicates the issue:
>  
> {code:java}
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.apache.spark.sql.functions._object 
> Main {  
>    def main(args: Array[String]): Unit = {
>     // Create SparkSession
>     val spark = SparkSession.builder()
>       .master("local[*]")
>       // Disabling AQE fixes it
>       //.config("spark.sql.adaptive.enabled", "false")
>       // Still costly and doesn't fix the OOM
>       //.config("spark.sql.ui.retainedExecutions","1")
>       //.config("spark.sql.ui.retainedExecutions","0")
>       // Still costly, triggers lots of warnings and ... fixes it?
>       // After 15min it still haven't got to the 23rd iteration
>       //.config("spark.sql.maxPlanStringLength","1000000")
>       // Currently available explain modes are costly and throw OOM
>       //.config("spark.sql.ui.explainMode","simple")
>       //.config("spark.sql.ui.explainMode","extended")
>       //.config("spark.sql.ui.explainMode","codegen")
>       //.config("spark.sql.ui.explainMode","cost")
>       //.config("spark.sql.ui.explainMode","formatted")
>       // Works perfectly fine and it takes the same time as with AQE disabled
>       //.config("spark.sql.ui.explainMode","off")
>       .getOrCreate()    
>     import spark.implicits._    
>     // Create an empty DataFrame with an initial schema
>     var df = spark.emptyDataFrame
>     println("Initial empty DataFrame created.")    
>     // Start measuring execution time
>     val startTime = System.currentTimeMillis()    
>     // Loop to modify DataFrame
>     for (i <- 1 to 25) {
>       // Add a new column with null values
>       df = df.withColumn(s"col$i", lit(null: String)).cache()      
>       // Filter the DataFrame
>       df = df.filter($"col$i".isNotNull)      
>       // Explain the query plan
>       //df.explain()      
>       // Show the DataFrame
>       df.show()
>     }    
>     // Print execution time
>     val totalTime = System.currentTimeMillis() - startTime
>     println(s"Total execution time: $totalTime ms")    
>     // Stop the SparkSession
>     spark.stop()
>     println("SparkSession stopped.")
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to