[ https://issues.apache.org/jira/browse/SPARK-50992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ángel Álvarez Pascua updated SPARK-50992: ----------------------------------------- Attachment: Main.scala > OOMs and performance issues with AQE in large plans > --------------------------------------------------- > > Key: SPARK-50992 > URL: https://issues.apache.org/jira/browse/SPARK-50992 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 4.0.0, 3.5.3, 3.5.4 > Reporter: Ángel Álvarez Pascua > Priority: Major > Attachments: Main.scala > > > When AQE is enabled, Spark triggers update events to the internal listener > bus whenever a plan changes. These events include a plain-text description of > the plan, which is computationally expensive to generate for large plans. > *Key Issues:* > # *High Cost of Plan String Calculation:* > ** Generating the string description for large physical plans is a costly > operation. > ** This impacts performance, particularly in complex workflows with frequent > plan updates (e.g. persisting DataFrames). > # *Out-of-Memory (OOM) Errors:* > ** Events are stored in the listener bus as {{SQLExecutionUIData}} objects > and retained until a threshold is reached. > ** This retention behavior can lead to memory exhaustion when processing > large plans, causing OOM errors. > # *Current Workarounds Are Ineffective:* > ** *Reducing Retained Executions* ({{{}spark.sql.ui.retainedExecutions{}}}): > Even when set to {{1}} or {{{}0{}}}, events are still created, requiring plan > string calculations. > ** *Limiting Plan String Length* ({{{}spark.sql.maxPlanStringLength{}}}): > Reducing the maximum string length (e.g., to {{{}1,000,000{}}}) may mitigate > OOMs but does not eliminate the overhead of string generation. > ** *Available Explain Modes:* All existing explain modes are verbose and > computationally expensive, failing to resolve these issues. > *Proposed Solution:* > Introduce a new explain mode, {*}{{off}}{*}, which suppresses the generation > of plan string descriptions. > * When this mode is enabled, Spark skips the calculation of plan > descriptions altogether. > * This resolves OOM errors and restores performance parity with non-AQE > execution. > *Impact of Proposed Solution:* > * Eliminates OOMs in large plans with AQE enabled. > * Reduces the performance overhead associated with plan string generation. > * Ensures Spark scales better in environments with large, complex plans. > *Reproducibility:* > The following test replicates the issue: > > {code:java} > import org.apache.spark.sql.{DataFrame, SparkSession} > import org.apache.spark.sql.functions._object > Main { > def main(args: Array[String]): Unit = { > // Create SparkSession > val spark = SparkSession.builder() > .master("local[*]") > // Disabling AQE fixes it > //.config("spark.sql.adaptive.enabled", "false") > // Still costly and doesn't fix the OOM > //.config("spark.sql.ui.retainedExecutions","1") > //.config("spark.sql.ui.retainedExecutions","0") > // Still costly, triggers lots of warnings and ... fixes it? > // After 15min it still haven't got to the 23rd iteration > //.config("spark.sql.maxPlanStringLength","1000000") > // Currently available explain modes are costly and throw OOM > //.config("spark.sql.ui.explainMode","simple") > //.config("spark.sql.ui.explainMode","extended") > //.config("spark.sql.ui.explainMode","codegen") > //.config("spark.sql.ui.explainMode","cost") > //.config("spark.sql.ui.explainMode","formatted") > // Works perfectly fine and it takes the same time as with AQE disabled > //.config("spark.sql.ui.explainMode","off") > .getOrCreate() > import spark.implicits._ > // Create an empty DataFrame with an initial schema > var df = spark.emptyDataFrame > println("Initial empty DataFrame created.") > // Start measuring execution time > val startTime = System.currentTimeMillis() > // Loop to modify DataFrame > for (i <- 1 to 25) { > // Add a new column with null values > df = df.withColumn(s"col$i", lit(null: String)).cache() > // Filter the DataFrame > df = df.filter($"col$i".isNotNull) > // Explain the query plan > //df.explain() > // Show the DataFrame > df.show() > } > // Print execution time > val totalTime = System.currentTimeMillis() - startTime > println(s"Total execution time: $totalTime ms") > // Stop the SparkSession > spark.stop() > println("SparkSession stopped.") > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org