[ https://issues.apache.org/jira/browse/SPARK-51798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17944592#comment-17944592 ]
Ahmad Humayun edited comment on SPARK-51798 at 4/15/25 12:57 PM: ----------------------------------------------------------------- Similar to SPARK-38485 Optimizer behavior is similar (i.e. not respecting non-determinism of the UDF), but the cause and symptoms are different was (Author: JIRAUSER309405): Similar to [SPARK-38485|https://issues.apache.org/jira/browse/SPARK-38485] > Non-deterministic property of UDF not respected when used in orderBy > -------------------------------------------------------------------- > > Key: SPARK-51798 > URL: https://issues.apache.org/jira/browse/SPARK-51798 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 3.5.0 > Environment: *Environment:* > * Spark version: 3.5.0 > * Scala version: 2.13 > * Local mode / standalone > * Java 11 Corretto > Reporter: Ahmad Humayun > Priority: Major > > *Summary:* > Non-deterministic UDFs are evaluated multiple times per row when Spark > optimizer rules are disabled, violating expected semantics of > {{.asNondeterministic()}} > *Description:* > According to the Spark API, marking a UDF as {{.asNondeterministic()}} should > instruct the optimizer to avoid redundant re-evaluations and treat the result > as unstable. However, when optimizer rules are disabled via > {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF > gets evaluated multiple times for the same input row. > This breaks the expected semantics and causes issues for use cases involving > external state, such as querying a Redis-like client. > *Minimal Reproduction Example:* > > {code:java} > // Emulate an external state manager > object DummyRedisClient { > private var state: Int = 1 > private var increasing: Boolean = true > def get(key: String): String = { > val result = state.toString > if (increasing) { > if (state < 3) state += 1 else { state -= 1; increasing = false } > } else { > if (state > 0) state -= 1 else { state += 1; increasing = true } > } > result > } > } > object Main { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[*]") > .getOrCreate() > // UDF that queries external state > val myUdf = udf((input: String) => { > DummyRedisClient.get("global_state").toInt > }).asNondeterministic() > // Disable all excludable optimizer rules > val sparkOpt = spark.sessionState.optimizer > val excludableRules = sparkOpt.defaultBatches > .flatMap(_.rules.map(_.ruleName)) > .toSet -- sparkOpt.nonExcludableRules.toSet > val excludedRules = excludableRules.mkString(",") > SQLConf > .get > .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules) > > // Emulate data source > spark.range(1, 4) > .repartition(1) > .write > .mode("overwrite") > .parquet("/tmp/jira-issue") > // Minimal query > val a = spark.read.parquet("/tmp/jira-issue") > val b = a.orderBy(myUdf(col("id"))) > b.show() > }}{code} > *Expected Behavior:* > Each UDF call should be evaluated exactly once per input row, regardless of > optimizer rule configuration, since {{.asNondeterministic()}} has been > explicitly set. Infact when the optimizations are left as is (comment out the > code that excludes optimizations) the behavior is as expected, when the UDF > is marked as non-deterministic, it is only called once per row and when the > non-deterministic property is removed, the udf is called multiple times per > row, showing that the optimizer respected the non-determinism when it was > declared. > *Actual Behavior:* > When optimizer rules are disabled, the UDF is invoked multiple times per > input row, violating the non-determinism contract and producing inconsistent > or incorrect results for stateful UDFs. > *Impact:* > Breaks correctness for UDFs involving external state or side effects (e.g., > querying Redis, logging, metrics). Also makes debugging unpredictable due to > duplicate calls. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org