Jiexing Li created SPARK-51756: ---------------------------------- Summary: Non-deterministic stage could lead to incorrect results with partial retry Key: SPARK-51756 URL: https://issues.apache.org/jira/browse/SPARK-51756 Project: Spark Issue Type: Test Components: Scheduler Affects Versions: 4.0.0 Reporter: Jiexing Li Fix For: 4.0.0
Spark's resilience features can cause an RDD to be partially recomputed, e.g. when an executor is lost due to downscaling, or due to a spot instance kill. When the output of a nondeterministic task is recomputed, Spark does not always recompute everything that depends on this task's output. In some cases, some subsequent computations are based on the output of one "attempt" of the task, while other subsequent computations are based on another "attempt". This could be problematic when the producer stage is non-deterministic. In which case, the second attempt of the same task can produce output that is very different from the first one. For example, if the stage uses a round-robin partitioning, some of the output data could be placed in different partitions in different task attempts. This could lead to incorrect results unless we retry the whole consumer stage that depends on retried non-deterministic stage. Below is an example of this. *Example:* Let's say we have Stage 1 and Stage 2, where Stage 1 is the producer and Stage 2 is the consumer. Assume that the data produced by Task 2 were lost due to some reason while Stage 2 is executing. Further assume that at this point, Task 1 of Stage 2 has already gotten all its inputs and finishes, while Task 2 of Stage 2 fails with data fetch failures. !image-2025-04-09-10-21-01-756.png|width=574,height=280! Task 2 of Stage 1 will be retried to reproduce the data, and after which Task 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces the result which contains all 4 tuples \{t1, t2, t3, t4} as shown in the example graph. !image-2025-04-09-10-21-48-964.png|width=755,height=279! Now, let's assume that Stage 1 is non-deterministic (e.g., when using round-robin partitioning and the input data is not ordering), and Task 2 places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in its second attempt. When Task 2 of Stage 2 is retried, instead of reading \{t2, t4} as it should, it reads \{t2, t3} as its input. The result generated by Stage 2 is \{t1, t2, t3, t3}, which is inaccurate. !image-2025-04-09-10-22-16-554.png|width=709,height=245! The problem can be avoided if we retry all tasks of Stage 2. As all tasks read consistent data, we can produce result correctly, regardless of how the retried of Stage 1 Task 2 would partition the data. !image-2025-04-09-10-22-42-222.png|width=670,height=273! *Proposal:* To avoid correctness issues produce by non-deterministic stage with partial retry, we propose an approach which first try to detect inconsistent data that might be generated by different task attempts of a non-deterministic stage. For example, whether all the data partitions generated by Task 2 in the first attempt are the same as the all the data partitions generated by the second attempt. We retry the entire consumer stages if inconsistent data is detected. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org