[ https://issues.apache.org/jira/browse/SPARK-51016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924245#comment-17924245 ]
Asif edited comment on SPARK-51016 at 2/20/25 11:20 PM: -------------------------------------------------------- Pull Request: [https://github.com/apache/spark/pull/50029|https://github.com/apache/spark/pull/50029] was (Author: ashahid7): Pull Request: [https://github.com/apache/spark/pull/49708|https://github.com/apache/spark/pull/49708] > result data compromised in case of indeterministic join keys in Outer Join > op, when retry happens > ------------------------------------------------------------------------------------------------- > > Key: SPARK-51016 > URL: https://issues.apache.org/jira/browse/SPARK-51016 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.5.4 > Reporter: Asif > Priority: Major > Labels: pull-request-available, spark-sql > > For a query like: > {quote}val outerDf = spark.createDataset( > Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, > "cc")))( > Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", > "strleft") > val innerDf = spark.createDataset( > Seq((1L, "11"), (2L, "22"), (3L, "33")))( > Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", > "strright") > val leftOuter = outerDf.select( > col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * > Literal(10000000L)). > cast(LongType)). > otherwise(col("pkLeftt")).as("pkLeft")) > val outerjoin = leftOuter.hint("shuffle_hash"). > join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") > {quote} > > where an arbitrary long value is assigned to the left table's joining key , > if it is null, so that skew is avoided during shuffle, can result in wrong > results like data loss, if the partition task is retried. > The reason being that in such cases, if even one partition task fails, the > whole shuffle stage needs to be re-attempted. This is to ensure that > situation arising in retrying of single failed task, can result in some rows > getting assigned to that partition , whose task has already finished. > Spark's DagScheduler, TaskSchedulerImpl already expects such situations and > it relies on the boolean stage.isIndeterminate, to retry whole stage. > This boolean is evaluated by consulting the RDD's dependency graph to find if > any dependency is inDeterministic. > The bug exists in ShuffleDependency code which not consulting the hashing > expression to see if it has any component which is representing an > indeterministic value. > Also, there is no way to identify if an attribute reference or an > expression's value has an indeterministic component > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org