Asif created SPARK-51016: ---------------------------- Summary: result data compromised in case of indeterministic join keys in Outer Join op, when retry happens Key: SPARK-51016 URL: https://issues.apache.org/jira/browse/SPARK-51016 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.4 Reporter: Asif
For a query like: {quote}val outerDf = spark.createDataset( Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") val innerDf = spark.createDataset( Seq((1L, "11"), (2L, "22"), (3L, "33")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") val leftOuter = outerDf.select( col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). cast(LongType)). otherwise(col("pkLeftt")).as("pkLeft")) val outerjoin = leftOuter.hint("shuffle_hash"). join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") {quote} where an arbitrary long value is assigned to the left table's joining key , if it is null, so that skew is avoided during shuffle, can result in wrong results like data loss, if the partition task is retried. The reason being that in such cases, if even one partition task fails, the whole shuffle stage needs to be re-attempted. This is to ensure that situation arising in retrying of single failed task, can result in some rows getting assigned to that partition , whose task has already finished. Spark's DagScheduler, TaskSchedulerImpl already expects such situations and it relies on the boolean stage.isIndeterminate, to retry whole stage. This boolean is evaluated by consulting the RDD's dependency graph to find if any dependency is inDeterministic. The bug exists in ShuffleDependency code which not consulting the hashing expression to see if it has any component which is representing an indeterministic value. Also, there is no way to identify if an attribute reference or an expression's value has an indeterministic component -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org