Asif created SPARK-51016:
----------------------------

             Summary: result data compromised in case of indeterministic join 
keys in Outer Join op, when retry happens
                 Key: SPARK-51016
                 URL: https://issues.apache.org/jira/browse/SPARK-51016
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.4
            Reporter: Asif


For a query like:
{quote}val outerDf = spark.createDataset(
Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, 
"cc")))(
Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", 
"strleft")

val innerDf = spark.createDataset(
Seq((1L, "11"), (2L, "22"), (3L, "33")))(
Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", 
"strright")

val leftOuter = outerDf.select(
col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)).
cast(LongType)).
otherwise(col("pkLeftt")).as("pkLeft"))

val outerjoin = leftOuter.hint("shuffle_hash").
join(innerDf, col("pkLeft") === col("pkRight"), "left_outer")
{quote}
 

where an arbitrary long value is assigned to the left table's joining key , if 
it is null, so that skew is avoided during shuffle,  can result in wrong 
results like data loss, if the partition task is retried.

The reason being that in such cases, if even one partition task fails, the 
whole shuffle stage needs to be re-attempted. This is to ensure that situation 
arising in retrying of single failed task, can result in some rows getting 
assigned to that partition , whose task has already finished.

Spark's DagScheduler, TaskSchedulerImpl already expects such situations and it 
relies on the boolean stage.isIndeterminate, to retry whole stage.

This boolean is evaluated by consulting the RDD's dependency graph to find if 
any dependency is inDeterministic.

The bug exists in ShuffleDependency code which not consulting the hashing 
expression to see if it has any component which is representing an 
indeterministic value.

Also, there is no way to identify if an attribute reference or an expression's 
value has an indeterministic component

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to