pravin-thangaraj-13731 opened a new issue, #54419:
URL: https://github.com/apache/spark/issues/54419

   
https://github.com/apache/spark/blob/29fc5598005903e1e99a46f6065d2d2ed6b7285a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala#L58-L66
   
   **Spark version - 4.0.0**
   
   Our use case involves applying a sequence of user-defined transformation 
rules (queries) to a dataset, one after another. Each rule is dynamically 
generated and applied sequentially using Spark APIs (e.g., withColumn, 
regexp_replace, when, etc.).
   
   In one scenario, a user attempted to clean up an `Email column` by applying 
a large number of replace operations (regex-based corrections) sequentially. 
For example:Fix malformed domains,Correct common typos ,Remove invalid 
characters,Normalize formats...
   
   As the number of rules increases, the logical plan grows deeply nested. 
Eventually, this causes:  exponential logical plan growth leading to 
Driver-side OutOfMemoryError while Resolve Plan.
   
   **System Architecture Context:-**
   
   - Our platform is multi-tenant.
   
   - Each tenant can define multiple transformation rules.
   
   - Rules are dynamically defined at runtime.
   
   - Rules are applied sequentially on the same dataset.
   
   - All transformations occur within the same Spark session and logical plan 
lineage.
   
   **These are What we Observed from the Thread Dump:**
   
   Each withColumn call creates a new Project node in the logical plan. When 
~30 are chained, the plan becomes a deeply nested stack of Project → Project → 
… → Project → LeafNode.
   During optimization, the PushDownPredicates rule Optimizer.scala#L1790–L1795 
- 
https://github.com/apache/spark/blob/29fc5598005903e1e99a46f6065d2d2ed6b7285a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1790-L1795
 applies the composed partial function of CombineFilters, 
PushPredicateThroughNonJoin, and PushPredicateThroughJoin via 
plan.transformWithPruning:
   
   When PushPredicateThroughNonJoin matches a Filter(condition, Project(fields, 
grandChild)), it calls replaceAlias(condition, aliasMap), which first does a 
transformUp over the condition (substituting aliases), then calls trimAliases 
on the result: trimAliases itself recursively walks the entire expression tree 
via mapChildren.
   
   **The key problem:-** As the filter is pushed through each Project layer in 
the nested plan, the condition expression grows (because aliases are inlined 
into it at each level). On each optimizer iteration, the PushDownPredicates 
rule traverses the entire plan top-down, and at each Filter → Project match it 
calls replaceAlias + trimAliases which does a full expression-tree walk via 
transformUp + recursive mapChildren. With N nested Project nodes, this produces 
O(N²) expression-tree traversals per optimizer iteration, and the optimizer 
runs up to spark.sql.optimizer.maxIterations (default 100) iterations.
   
   We faced the same issue when Register Temp Table and applied the same logic 
using Spark SQL.
   
   The condition expression also grows at each pushdown level because alias 
substitution expands attribute references into their aliased expressions, 
compounding the trimAliases cost.
   
   **#Thread Dump:-**
   
   "main" #1 prio=5 os_prio=31 cpu=222332.32ms elapsed=11268.87s 
tid=0x000000010348cdf0 nid=0x1603 RUNNABLE
   at scala.collection.immutable.List.map(List.scala:247)
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:708)
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:521)
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:521)
   ... (repeating ~30 times for each Project layer) ...
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:521)
   at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
   at 
org.apache.spark.sql.catalyst.expressions.AliasHelper.replaceAlias(AliasHelper.scala:60)
   at 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$.replaceAlias(Optimizer.scala:1806)
   at 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$5.applyOrElse(Optimizer.scala:1819)
   ...
   at 
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates$.apply(Optimizer.scala:1795)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:242)
   ...
   at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:212)
   
   **Reproduce Code:-**
   
   ```
   import org.apache.spark.sql.SparkSession
   import org.apache.spark.sql.functions._
   
   object Main4 {
   
     def main(args: Array[String]): Unit = {
       val spark = SparkSession.builder()
         .appName("Spark")
         .master("local[*]")
         .config("spark.driver.memory", "2g")
         .getOrCreate()
       var df = spark.read.csv("/Users/Downloads/data.csv")
   
       val myUdf = udf((s: String,dt: String) => {
         if (s == null) false else s.contains("@")
       })
   
       val replacements = (1 to 30).map(_ => ("@yahoo\\.c", "@yahoo.com"))
   
       replacements.foreach { case (pattern, repl) =>
         df = df.withColumn(
           "_c2",
           when(myUdf(col("_c2"), lit("jsonMeta")),
             regexp_replace(col("_c2"), pattern, repl)
           ).otherwise(col("_c2"))
         )
       }
   
       df.filter(myUdf(col("_c2"), 
lit("jsonMeta"))).queryExecution.optimizedPlan
     }
   
   }
   ```
   
   **Questions:-** 
   1. Is this expected behavior?
   2. Recommended pattern for iterative column rewriting? - This was our use 
case in ETL workflows.
   3. Is this a Catalyst optimizer scalability issue?
   4, we excluded all optimal catalyst rules. In that case this issue is 
resolved but, still we don't want to disable optimization just for such jobs.
   That will degrade performance for all workloads — including simple ones.
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to