[ 
https://issues.apache.org/jira/browse/SPARK-51016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924245#comment-17924245
 ] 

Asif edited comment on SPARK-51016 at 2/20/25 11:20 PM:
--------------------------------------------------------

Pull Request:

[https://github.com/apache/spark/pull/50029|https://github.com/apache/spark/pull/50029]


was (Author: ashahid7):
Pull Request:

[https://github.com/apache/spark/pull/49708|https://github.com/apache/spark/pull/49708]

> result data compromised in case of indeterministic join keys in Outer Join 
> op, when retry happens
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-51016
>                 URL: https://issues.apache.org/jira/browse/SPARK-51016
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.4
>            Reporter: Asif
>            Priority: Major
>              Labels: pull-request-available, spark-sql
>
> For a query like:
> {quote}val outerDf = spark.createDataset(
> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, 
> "cc")))(
> Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", 
> "strleft")
> val innerDf = spark.createDataset(
> Seq((1L, "11"), (2L, "22"), (3L, "33")))(
> Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", 
> "strright")
> val leftOuter = outerDf.select(
> col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * 
> Literal(10000000L)).
> cast(LongType)).
> otherwise(col("pkLeftt")).as("pkLeft"))
> val outerjoin = leftOuter.hint("shuffle_hash").
> join(innerDf, col("pkLeft") === col("pkRight"), "left_outer")
> {quote}
>  
> where an arbitrary long value is assigned to the left table's joining key , 
> if it is null, so that skew is avoided during shuffle,  can result in wrong 
> results like data loss, if the partition task is retried.
> The reason being that in such cases, if even one partition task fails, the 
> whole shuffle stage needs to be re-attempted. This is to ensure that 
> situation arising in retrying of single failed task, can result in some rows 
> getting assigned to that partition , whose task has already finished.
> Spark's DagScheduler, TaskSchedulerImpl already expects such situations and 
> it relies on the boolean stage.isIndeterminate, to retry whole stage.
> This boolean is evaluated by consulting the RDD's dependency graph to find if 
> any dependency is inDeterministic.
> The bug exists in ShuffleDependency code which not consulting the hashing 
> expression to see if it has any component which is representing an 
> indeterministic value.
> Also, there is no way to identify if an attribute reference or an 
> expression's value has an indeterministic component
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to