Jiexing Li created SPARK-51756:
----------------------------------

             Summary: Non-deterministic stage could lead to incorrect results 
with partial retry
                 Key: SPARK-51756
                 URL: https://issues.apache.org/jira/browse/SPARK-51756
             Project: Spark
          Issue Type: Test
          Components: Scheduler
    Affects Versions: 4.0.0
            Reporter: Jiexing Li
             Fix For: 4.0.0


Spark's resilience features can cause an RDD to be partially recomputed, e.g. 
when an executor is lost due to downscaling, or due to a spot instance kill. 
When the output of a nondeterministic task is recomputed, Spark does not always 
recompute everything that depends on this task's output. In some cases, some 
subsequent computations are based on the output of one "attempt" of the task, 
while other subsequent computations are based on another "attempt".

This could be problematic when the producer stage is non-deterministic. In 
which case, the second attempt of the same task can produce output that is very 
different from the first one. For example, if the stage uses a round-robin 
partitioning, some of the output data could be placed in different partitions 
in different task attempts. This could lead to incorrect results unless we 
retry the whole consumer stage that depends on retried non-deterministic stage. 
Below is an example of this.

*Example:*

Let's say we have Stage 1 and Stage 2, where Stage 1 is the producer and Stage 
2 is the consumer. Assume that the data produced by Task 2 were lost due to 
some reason while Stage 2 is executing. Further assume that at this point, Task 
1 of Stage 2 has already gotten all its inputs and finishes, while Task 2 of 
Stage 2 fails with data fetch failures.
 
!image-2025-04-09-10-21-01-756.png|width=574,height=280!

 

Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 2 
of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces the 
result which contains all 4 tuples \{t1, t2, t3, t4} as shown in the example 
graph.
 
!image-2025-04-09-10-21-48-964.png|width=755,height=279!
Now, let's assume that Stage 1 is non-deterministic (e.g., when using 
round-robin partitioning and the input data is not ordering), and Task 2 places 
tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first attempt. It 
places tuple t4 for Partition 1 and tuple t3 for Partition 2 in its second 
attempt. When Task 2 of Stage 2 is retried, instead of reading \{t2, t4} as it 
should, it reads \{t2, t3} as its input. The result generated by Stage 2 is 
\{t1, t2, t3, t3}, which is inaccurate.
 
!image-2025-04-09-10-22-16-554.png|width=709,height=245!
The problem can be avoided if we retry all tasks of Stage 2. As all tasks read 
consistent data, we can produce result correctly, regardless of how the retried 
of Stage 1 Task 2 would partition the data.

 
!image-2025-04-09-10-22-42-222.png|width=670,height=273!
 
*Proposal:*

To avoid correctness issues produce by non-deterministic stage with partial 
retry, we propose an approach which first try to detect inconsistent data that 
might be generated by different task attempts of a non-deterministic stage. For 
example, whether all the data partitions generated by Task 2 in the first 
attempt are the same as the all the data partitions generated by the second 
attempt. We retry the entire consumer stages if inconsistent data is detected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to