[ 
https://issues.apache.org/jira/browse/SPARK-51756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiexing Li updated SPARK-51756:
-------------------------------
    Attachment: example 1.png

> Non-deterministic stage could lead to incorrect results with partial retry
> --------------------------------------------------------------------------
>
>                 Key: SPARK-51756
>                 URL: https://issues.apache.org/jira/browse/SPARK-51756
>             Project: Spark
>          Issue Type: Test
>          Components: Scheduler
>    Affects Versions: 4.0.0
>            Reporter: Jiexing Li
>            Priority: Major
>             Fix For: 4.0.0
>
>         Attachments: example 1.png, example 2.png, example 3.png, example 
> 4.png
>
>
> Spark's resilience features can cause an RDD to be partially recomputed, e.g. 
> when an executor is lost due to downscaling, or due to a spot instance kill. 
> When the output of a nondeterministic task is recomputed, Spark does not 
> always recompute everything that depends on this task's output. In some 
> cases, some subsequent computations are based on the output of one "attempt" 
> of the task, while other subsequent computations are based on another 
> "attempt".
> This could be problematic when the producer stage is non-deterministic. In 
> which case, the second attempt of the same task can produce output that is 
> very different from the first one. For example, if the stage uses a 
> round-robin partitioning, some of the output data could be placed in 
> different partitions in different task attempts. This could lead to incorrect 
> results unless we retry the whole consumer stage that depends on retried 
> non-deterministic stage. Below is an example of this.
> *Example:*
> Let's say we have Stage 1 and Stage 2, where Stage 1 is the producer and 
> Stage 2 is the consumer. Assume that the data produced by Task 2 were lost 
> due to some reason while Stage 2 is executing. Further assume that at this 
> point, Task 1 of Stage 2 has already gotten all its inputs and finishes, 
> while Task 2 of Stage 2 fails with data fetch failures (See example 1 in 
> attachment.)
> Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 
> 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces 
> the result which contains all 4 tuples \{t1, t2, t3, t4} (See example 2 in 
> attachment.).
>  
> Now, let's assume that Stage 1 is non-deterministic (e.g., when using 
> round-robin partitioning and the input data is not ordering), and Task 2 
> places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first 
> attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in 
> its second attempt. When Task 2 of Stage 2 is retried, instead of reading 
> \{t2, t4} as it should, it reads \{t2, t3} as its input. The result generated 
> by Stage 2 is \{t1, t2, t3, t3}, which is inaccurate. (See example 3 in 
> attachment.)
>  
> The problem can be avoided if we retry all tasks of Stage 2. As all tasks 
> read consistent data, we can produce result correctly, regardless of how the 
> retried of Stage 1 Task 2 would partition the data. (See example 4 in 
> attachment.) 
> *Proposal:*
> To avoid correctness issues produce by non-deterministic stage with partial 
> retry, we propose an approach which first try to detect inconsistent data 
> that might be generated by different task attempts of a non-deterministic 
> stage. For example, whether all the data partitions generated by Task 2 in 
> the first attempt are the same as the all the data partitions generated by 
> the second attempt. We retry the entire consumer stages if inconsistent data 
> is detected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to