[
https://issues.apache.org/jira/browse/SPARK-51016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Asif updated SPARK-51016:
-------------------------
Labels: pull-request-available spark-sql (was: spark-sql)
> result data compromised in case of indeterministic join keys in Outer Join
> op, when retry happens
> -------------------------------------------------------------------------------------------------
>
> Key: SPARK-51016
> URL: https://issues.apache.org/jira/browse/SPARK-51016
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.4
> Reporter: Asif
> Priority: Major
> Labels: pull-request-available, spark-sql
>
> For a query like:
> {quote}val outerDf = spark.createDataset(
> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null,
> "cc")))(
> Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt",
> "strleft")
> val innerDf = spark.createDataset(
> Seq((1L, "11"), (2L, "22"), (3L, "33")))(
> Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight",
> "strright")
> val leftOuter = outerDf.select(
> col("strleft"), when(isnull(col("pkLeftt")), floor(rand() *
> Literal(10000000L)).
> cast(LongType)).
> otherwise(col("pkLeftt")).as("pkLeft"))
> val outerjoin = leftOuter.hint("shuffle_hash").
> join(innerDf, col("pkLeft") === col("pkRight"), "left_outer")
> {quote}
>
> where an arbitrary long value is assigned to the left table's joining key ,
> if it is null, so that skew is avoided during shuffle, can result in wrong
> results like data loss, if the partition task is retried.
> The reason being that in such cases, if even one partition task fails, the
> whole shuffle stage needs to be re-attempted. This is to ensure that
> situation arising in retrying of single failed task, can result in some rows
> getting assigned to that partition , whose task has already finished.
> Spark's DagScheduler, TaskSchedulerImpl already expects such situations and
> it relies on the boolean stage.isIndeterminate, to retry whole stage.
> This boolean is evaluated by consulting the RDD's dependency graph to find if
> any dependency is inDeterministic.
> The bug exists in ShuffleDependency code which not consulting the hashing
> expression to see if it has any component which is representing an
> indeterministic value.
> Also, there is no way to identify if an attribute reference or an
> expression's value has an indeterministic component
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]