[ 
https://issues.apache.org/jira/browse/SPARK-51798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17944592#comment-17944592
 ] 

Ahmad Humayun commented on SPARK-51798:
---------------------------------------

Similar to [SPARK-38485|https://issues.apache.org/jira/browse/SPARK-38485]

> Non-deterministic property of UDF not respected when used in orderBy
> --------------------------------------------------------------------
>
>                 Key: SPARK-51798
>                 URL: https://issues.apache.org/jira/browse/SPARK-51798
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.5.0
>         Environment: *Environment:*
>  * Spark version: 3.5.0
>  * Scala version: 2.13
>  * Local mode / standalone
>  * Java 11 Corretto
>            Reporter: Ahmad Humayun
>            Priority: Major
>
> *Summary:*
> Non-deterministic UDFs are evaluated multiple times per row when Spark 
> optimizer rules are disabled, violating expected semantics of 
> {{.asNondeterministic()}}
> *Description:*
> According to the Spark API, marking a UDF as {{.asNondeterministic()}} should 
> instruct the optimizer to avoid redundant re-evaluations and treat the result 
> as unstable. However, when optimizer rules are disabled via 
> {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF 
> gets evaluated multiple times for the same input row.
> This breaks the expected semantics and causes issues for use cases involving 
> external state, such as querying a Redis-like client.
> *Minimal Reproduction Example:*
>  
> {code:java}
> // Emulate an external state manager
> object DummyRedisClient {
>   private var state: Int = 1
>   private var increasing: Boolean = true  
>   def get(key: String): String = {
>     val result = state.toString
>     if (increasing) {
>       if (state < 3) state += 1 else { state -= 1; increasing = false }
>     } else {
>       if (state > 0) state -= 1 else { state += 1; increasing = true }
>     }
>     result
>   }
> }
> object Main {
>  def main(args: Array[String]): Unit = {   
>    val spark = SparkSession
>                 .builder()
>                 .master("local[*]")
>                 .getOrCreate()
>    // UDF that queries external state
>    val myUdf = udf((input: String) => {     
>      DummyRedisClient.get("global_state").toInt     
>    }).asNondeterministic()
>    // Disable all excludable optimizer rules   
>    val sparkOpt = spark.sessionState.optimizer   
>    val excludableRules = sparkOpt.defaultBatches
>                            .flatMap(_.rules.map(_.ruleName))
>                            .toSet -- sparkOpt.nonExcludableRules.toSet   
>    val excludedRules = excludableRules.mkString(",")   
>    SQLConf
>      .get
>      .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules)
>    
>    // Emulate data source
>    spark.range(1, 4)
>      .repartition(1)
>      .write
>      .mode("overwrite")
>      .parquet("/tmp/jira-issue")
>    // Minimal query
>    val a = spark.read.parquet("/tmp/jira-issue")   
>    val b = a.orderBy(myUdf(col("id")))   
>    b.show()
> }}{code}
> *Expected Behavior:*
> Each UDF call should be evaluated exactly once per input row, regardless of 
> optimizer rule configuration, since {{.asNondeterministic()}} has been 
> explicitly set.
> *Actual Behavior:*
> When optimizer rules are disabled, the UDF is invoked multiple times per 
> input row, violating the non-determinism contract and producing inconsistent 
> or incorrect results for stateful UDFs.
> *Impact:*
> Breaks correctness for UDFs involving external state or side effects (e.g., 
> querying Redis, logging, metrics). Also makes debugging unpredictable due to 
> duplicate calls.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to