[ https://issues.apache.org/jira/browse/SPARK-51798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ahmad Humayun updated SPARK-51798: ---------------------------------- Description: *Summary:* Non-deterministic UDFs are evaluated multiple times per row when Spark optimizer rules are disabled, violating expected semantics of {{.asNondeterministic()}} *Description:* According to the Spark API, marking a UDF as {{.asNondeterministic()}} should instruct the optimizer to avoid redundant re-evaluations and treat the result as unstable. However, when optimizer rules are disabled via {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets evaluated multiple times for the same input row. This breaks the expected semantics and causes issues for use cases involving external state, such as querying a Redis-like client. *Minimal Reproduction Example:* {code:java} // Emulate an external state manager object DummyRedisClient { private var state: Int = 1 private var increasing: Boolean = true def get(key: String): String = { val result = state.toString if (increasing) { if (state < 3) state += 1 else { state -= 1; increasing = false } } else { if (state > 0) state -= 1 else { state += 1; increasing = true } } result } } object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .getOrCreate() // UDF that queries external state val myUdf = udf((input: String) => { DummyRedisClient.get("global_state").toInt }).asNondeterministic() // Disable all excludable optimizer rules val sparkOpt = spark.sessionState.optimizer val excludableRules = sparkOpt.defaultBatches .flatMap(_.rules.map(_.ruleName)) .toSet -- sparkOpt.nonExcludableRules.toSet val excludedRules = excludableRules.mkString(",") SQLConf .get .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules) // Emulate data source spark.range(1, 4) .repartition(1) .write .mode("overwrite") .parquet("/tmp/jira-issue") // Minimal query val a = spark.read.parquet("/tmp/jira-issue") val b = a.orderBy(myUdf(col("id"))) b.show() }}{code} *Expected Behavior:* Each UDF call should be evaluated exactly once per input row, regardless of optimizer rule configuration, since {{.asNondeterministic()}} has been explicitly set. Infact when the optimizations are left as is (comment out the code that excludes optimizations) the behavior is as expected, when the UDF is marked as non-deterministic, it is only called once per row and when the non-deterministic property is removed, the udf is called multiple times per row, showing that the optimizer respected the non-determinism when it was declared. *Actual Behavior:* When optimizer rules are disabled, the UDF is invoked multiple times per input row, violating the non-determinism contract and producing inconsistent or incorrect results for stateful UDFs. *Impact:* Breaks correctness for UDFs involving external state or side effects (e.g., querying Redis, logging, metrics). Also makes debugging unpredictable due to duplicate calls. was: *Summary:* Non-deterministic UDFs are evaluated multiple times per row when Spark optimizer rules are disabled, violating expected semantics of {{.asNondeterministic()}} *Description:* According to the Spark API, marking a UDF as {{.asNondeterministic()}} should instruct the optimizer to avoid redundant re-evaluations and treat the result as unstable. However, when optimizer rules are disabled via {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets evaluated multiple times for the same input row. This breaks the expected semantics and causes issues for use cases involving external state, such as querying a Redis-like client. *Minimal Reproduction Example:* {code:java} // Emulate an external state manager object DummyRedisClient { private var state: Int = 1 private var increasing: Boolean = true def get(key: String): String = { val result = state.toString if (increasing) { if (state < 3) state += 1 else { state -= 1; increasing = false } } else { if (state > 0) state -= 1 else { state += 1; increasing = true } } result } } object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .getOrCreate() // UDF that queries external state val myUdf = udf((input: String) => { DummyRedisClient.get("global_state").toInt }).asNondeterministic() // Disable all excludable optimizer rules val sparkOpt = spark.sessionState.optimizer val excludableRules = sparkOpt.defaultBatches .flatMap(_.rules.map(_.ruleName)) .toSet -- sparkOpt.nonExcludableRules.toSet val excludedRules = excludableRules.mkString(",") SQLConf .get .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules) // Emulate data source spark.range(1, 4) .repartition(1) .write .mode("overwrite") .parquet("/tmp/jira-issue") // Minimal query val a = spark.read.parquet("/tmp/jira-issue") val b = a.orderBy(myUdf(col("id"))) b.show() }}{code} *Expected Behavior:* Each UDF call should be evaluated exactly once per input row, regardless of optimizer rule configuration, since {{.asNondeterministic()}} has been explicitly set. *Actual Behavior:* When optimizer rules are disabled, the UDF is invoked multiple times per input row, violating the non-determinism contract and producing inconsistent or incorrect results for stateful UDFs. *Impact:* Breaks correctness for UDFs involving external state or side effects (e.g., querying Redis, logging, metrics). Also makes debugging unpredictable due to duplicate calls. > Non-deterministic property of UDF not respected when used in orderBy > -------------------------------------------------------------------- > > Key: SPARK-51798 > URL: https://issues.apache.org/jira/browse/SPARK-51798 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 3.5.0 > Environment: *Environment:* > * Spark version: 3.5.0 > * Scala version: 2.13 > * Local mode / standalone > * Java 11 Corretto > Reporter: Ahmad Humayun > Priority: Major > > *Summary:* > Non-deterministic UDFs are evaluated multiple times per row when Spark > optimizer rules are disabled, violating expected semantics of > {{.asNondeterministic()}} > *Description:* > According to the Spark API, marking a UDF as {{.asNondeterministic()}} should > instruct the optimizer to avoid redundant re-evaluations and treat the result > as unstable. However, when optimizer rules are disabled via > {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF > gets evaluated multiple times for the same input row. > This breaks the expected semantics and causes issues for use cases involving > external state, such as querying a Redis-like client. > *Minimal Reproduction Example:* > > {code:java} > // Emulate an external state manager > object DummyRedisClient { > private var state: Int = 1 > private var increasing: Boolean = true > def get(key: String): String = { > val result = state.toString > if (increasing) { > if (state < 3) state += 1 else { state -= 1; increasing = false } > } else { > if (state > 0) state -= 1 else { state += 1; increasing = true } > } > result > } > } > object Main { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .getOrCreate() > // UDF that queries external state > val myUdf = udf((input: String) => { > DummyRedisClient.get("global_state").toInt > }).asNondeterministic() > // Disable all excludable optimizer rules > val sparkOpt = spark.sessionState.optimizer > val excludableRules = sparkOpt.defaultBatches > .flatMap(_.rules.map(_.ruleName)) > .toSet -- sparkOpt.nonExcludableRules.toSet > val excludedRules = excludableRules.mkString(",") > SQLConf > .get > .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules) > > // Emulate data source > spark.range(1, 4) > .repartition(1) > .write > .mode("overwrite") > .parquet("/tmp/jira-issue") > // Minimal query > val a = spark.read.parquet("/tmp/jira-issue") > val b = a.orderBy(myUdf(col("id"))) > b.show() > }}{code} > *Expected Behavior:* > Each UDF call should be evaluated exactly once per input row, regardless of > optimizer rule configuration, since {{.asNondeterministic()}} has been > explicitly set. Infact when the optimizations are left as is (comment out the > code that excludes optimizations) the behavior is as expected, when the UDF > is marked as non-deterministic, it is only called once per row and when the > non-deterministic property is removed, the udf is called multiple times per > row, showing that the optimizer respected the non-determinism when it was > declared. > *Actual Behavior:* > When optimizer rules are disabled, the UDF is invoked multiple times per > input row, violating the non-determinism contract and producing inconsistent > or incorrect results for stateful UDFs. > *Impact:* > Breaks correctness for UDFs involving external state or side effects (e.g., > querying Redis, logging, metrics). Also makes debugging unpredictable due to > duplicate calls. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org