[ 
https://issues.apache.org/jira/browse/SPARK-51798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmad Humayun updated SPARK-51798:
----------------------------------
    Description: 
*Summary:*
Non-deterministic UDFs are evaluated multiple times per row when Spark 
optimizer rules are disabled, violating expected semantics of 
{{.asNondeterministic()}}

*Description:*
According to the Spark API, marking a UDF as {{.asNondeterministic()}} should 
instruct the optimizer to avoid redundant re-evaluations and treat the result 
as unstable. However, when optimizer rules are disabled via 
{{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets 
evaluated multiple times for the same input row.

This breaks the expected semantics and causes issues for use cases involving 
external state, such as querying a Redis-like client.

*Minimal Reproduction Example:*

 
{code:java}
// Emulate an external state manager
object DummyRedisClient {
  private var state: Int = 1
  private var increasing: Boolean = true  
  def get(key: String): String = {
    val result = state.toString
    if (increasing) {
      if (state < 3) state += 1 else { state -= 1; increasing = false }
    } else {
      if (state > 0) state -= 1 else { state += 1; increasing = true }
    }
    result
  }
}


object Main {
 def main(args: Array[String]): Unit = {   
   val spark = SparkSession
                .builder()
                .master("local[*]")
                .getOrCreate()

   // UDF that queries external state
   val myUdf = udf((input: String) => {     
     DummyRedisClient.get("global_state").toInt     
   }).asNondeterministic()

   // Disable all excludable optimizer rules   
   val sparkOpt = spark.sessionState.optimizer   
   val excludableRules = sparkOpt.defaultBatches
                           .flatMap(_.rules.map(_.ruleName))
                           .toSet -- sparkOpt.nonExcludableRules.toSet   
   val excludedRules = excludableRules.mkString(",")   
   SQLConf
     .get
     .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules)
   
   // Emulate data source
   spark.range(1, 4)
     .repartition(1)
     .write
     .mode("overwrite")
     .parquet("/tmp/jira-issue")

   // Minimal query
   val a = spark.read.parquet("/tmp/jira-issue")   
   val b = a.orderBy(myUdf(col("id")))   
   b.show()
}}{code}
*Expected Behavior:*
Each UDF call should be evaluated exactly once per input row, regardless of 
optimizer rule configuration, since {{.asNondeterministic()}} has been 
explicitly set. Infact when the optimizations are left as is (comment out the 
code that excludes optimizations) the behavior is as expected, when the UDF is 
marked as non-deterministic, it is only called once per row and when the 
non-deterministic property is removed, the udf is called multiple times per 
row, showing that the optimizer respected the non-determinism when it was 
declared.

*Actual Behavior:*
When optimizer rules are disabled, the UDF is invoked multiple times per input 
row, violating the non-determinism contract and producing inconsistent or 
incorrect results for stateful UDFs.

*Impact:*
Breaks correctness for UDFs involving external state or side effects (e.g., 
querying Redis, logging, metrics). Also makes debugging unpredictable due to 
duplicate calls.

 

  was:
*Summary:*
Non-deterministic UDFs are evaluated multiple times per row when Spark 
optimizer rules are disabled, violating expected semantics of 
{{.asNondeterministic()}}

*Description:*
According to the Spark API, marking a UDF as {{.asNondeterministic()}} should 
instruct the optimizer to avoid redundant re-evaluations and treat the result 
as unstable. However, when optimizer rules are disabled via 
{{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets 
evaluated multiple times for the same input row.

This breaks the expected semantics and causes issues for use cases involving 
external state, such as querying a Redis-like client.

*Minimal Reproduction Example:*

 
{code:java}
// Emulate an external state manager
object DummyRedisClient {
  private var state: Int = 1
  private var increasing: Boolean = true  
  def get(key: String): String = {
    val result = state.toString
    if (increasing) {
      if (state < 3) state += 1 else { state -= 1; increasing = false }
    } else {
      if (state > 0) state -= 1 else { state += 1; increasing = true }
    }
    result
  }
}


object Main {
 def main(args: Array[String]): Unit = {   
   val spark = SparkSession
                .builder()
                .master("local[*]")
                .getOrCreate()

   // UDF that queries external state
   val myUdf = udf((input: String) => {     
     DummyRedisClient.get("global_state").toInt     
   }).asNondeterministic()

   // Disable all excludable optimizer rules   
   val sparkOpt = spark.sessionState.optimizer   
   val excludableRules = sparkOpt.defaultBatches
                           .flatMap(_.rules.map(_.ruleName))
                           .toSet -- sparkOpt.nonExcludableRules.toSet   
   val excludedRules = excludableRules.mkString(",")   
   SQLConf
     .get
     .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules)
   
   // Emulate data source
   spark.range(1, 4)
     .repartition(1)
     .write
     .mode("overwrite")
     .parquet("/tmp/jira-issue")

   // Minimal query
   val a = spark.read.parquet("/tmp/jira-issue")   
   val b = a.orderBy(myUdf(col("id")))   
   b.show()
}}{code}
*Expected Behavior:*
Each UDF call should be evaluated exactly once per input row, regardless of 
optimizer rule configuration, since {{.asNondeterministic()}} has been 
explicitly set.

*Actual Behavior:*
When optimizer rules are disabled, the UDF is invoked multiple times per input 
row, violating the non-determinism contract and producing inconsistent or 
incorrect results for stateful UDFs.

*Impact:*
Breaks correctness for UDFs involving external state or side effects (e.g., 
querying Redis, logging, metrics). Also makes debugging unpredictable due to 
duplicate calls.

 


> Non-deterministic property of UDF not respected when used in orderBy
> --------------------------------------------------------------------
>
>                 Key: SPARK-51798
>                 URL: https://issues.apache.org/jira/browse/SPARK-51798
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.5.0
>         Environment: *Environment:*
>  * Spark version: 3.5.0
>  * Scala version: 2.13
>  * Local mode / standalone
>  * Java 11 Corretto
>            Reporter: Ahmad Humayun
>            Priority: Major
>
> *Summary:*
> Non-deterministic UDFs are evaluated multiple times per row when Spark 
> optimizer rules are disabled, violating expected semantics of 
> {{.asNondeterministic()}}
> *Description:*
> According to the Spark API, marking a UDF as {{.asNondeterministic()}} should 
> instruct the optimizer to avoid redundant re-evaluations and treat the result 
> as unstable. However, when optimizer rules are disabled via 
> {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF 
> gets evaluated multiple times for the same input row.
> This breaks the expected semantics and causes issues for use cases involving 
> external state, such as querying a Redis-like client.
> *Minimal Reproduction Example:*
>  
> {code:java}
> // Emulate an external state manager
> object DummyRedisClient {
>   private var state: Int = 1
>   private var increasing: Boolean = true  
>   def get(key: String): String = {
>     val result = state.toString
>     if (increasing) {
>       if (state < 3) state += 1 else { state -= 1; increasing = false }
>     } else {
>       if (state > 0) state -= 1 else { state += 1; increasing = true }
>     }
>     result
>   }
> }
> object Main {
>  def main(args: Array[String]): Unit = {   
>    val spark = SparkSession
>                 .builder()
>                 .master("local[*]")
>                 .getOrCreate()
>    // UDF that queries external state
>    val myUdf = udf((input: String) => {     
>      DummyRedisClient.get("global_state").toInt     
>    }).asNondeterministic()
>    // Disable all excludable optimizer rules   
>    val sparkOpt = spark.sessionState.optimizer   
>    val excludableRules = sparkOpt.defaultBatches
>                            .flatMap(_.rules.map(_.ruleName))
>                            .toSet -- sparkOpt.nonExcludableRules.toSet   
>    val excludedRules = excludableRules.mkString(",")   
>    SQLConf
>      .get
>      .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules)
>    
>    // Emulate data source
>    spark.range(1, 4)
>      .repartition(1)
>      .write
>      .mode("overwrite")
>      .parquet("/tmp/jira-issue")
>    // Minimal query
>    val a = spark.read.parquet("/tmp/jira-issue")   
>    val b = a.orderBy(myUdf(col("id")))   
>    b.show()
> }}{code}
> *Expected Behavior:*
> Each UDF call should be evaluated exactly once per input row, regardless of 
> optimizer rule configuration, since {{.asNondeterministic()}} has been 
> explicitly set. Infact when the optimizations are left as is (comment out the 
> code that excludes optimizations) the behavior is as expected, when the UDF 
> is marked as non-deterministic, it is only called once per row and when the 
> non-deterministic property is removed, the udf is called multiple times per 
> row, showing that the optimizer respected the non-determinism when it was 
> declared.
> *Actual Behavior:*
> When optimizer rules are disabled, the UDF is invoked multiple times per 
> input row, violating the non-determinism contract and producing inconsistent 
> or incorrect results for stateful UDFs.
> *Impact:*
> Breaks correctness for UDFs involving external state or side effects (e.g., 
> querying Redis, logging, metrics). Also makes debugging unpredictable due to 
> duplicate calls.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to