[ https://issues.apache.org/jira/browse/SPARK-51756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiexing Li updated SPARK-51756: ------------------------------- Attachment: (was: example 1.png) > Non-deterministic stage could lead to incorrect results with partial retry > -------------------------------------------------------------------------- > > Key: SPARK-51756 > URL: https://issues.apache.org/jira/browse/SPARK-51756 > Project: Spark > Issue Type: Test > Components: Scheduler > Affects Versions: 4.0.0 > Reporter: Jiexing Li > Priority: Major > Fix For: 4.0.0 > > > Spark's resilience features can cause an RDD to be partially recomputed, e.g. > when an executor is lost due to downscaling, or due to a spot instance kill. > When the output of a nondeterministic task is recomputed, Spark does not > always recompute everything that depends on this task's output. In some > cases, some subsequent computations are based on the output of one "attempt" > of the task, while other subsequent computations are based on another > "attempt". > This could be problematic when the producer stage is non-deterministic. In > which case, the second attempt of the same task can produce output that is > very different from the first one. For example, if the stage uses a > round-robin partitioning, some of the output data could be placed in > different partitions in different task attempts. This could lead to incorrect > results unless we retry the whole consumer stage that depends on retried > non-deterministic stage. Below is an example of this. > *Example:* > Let's say we have Stage 1 and Stage 2, where Stage 1 is the producer and > Stage 2 is the consumer. Assume that the data produced by Task 2 were lost > due to some reason while Stage 2 is executing. Further assume that at this > point, Task 1 of Stage 2 has already gotten all its inputs and finishes, > while Task 2 of Stage 2 fails with data fetch failures. > > !image-2025-04-09-10-21-01-756.png|width=574,height=280! > > Task 2 of Stage 1 will be retried to reproduce the data, and after which Task > 2 of Stage 2 is retried. Eventually, Task 1 and Task 2 of Stage 2 produces > the result which contains all 4 tuples \{t1, t2, t3, t4} as shown in the > example graph. > > !image-2025-04-09-10-21-48-964.png|width=755,height=279! > Now, let's assume that Stage 1 is non-deterministic (e.g., when using > round-robin partitioning and the input data is not ordering), and Task 2 > places tuple t3 for Partition 1 and tuple t4 for Partition 2 in its first > attempt. It places tuple t4 for Partition 1 and tuple t3 for Partition 2 in > its second attempt. When Task 2 of Stage 2 is retried, instead of reading > \{t2, t4} as it should, it reads \{t2, t3} as its input. The result generated > by Stage 2 is \{t1, t2, t3, t3}, which is inaccurate. > > !image-2025-04-09-10-22-16-554.png|width=709,height=245! > The problem can be avoided if we retry all tasks of Stage 2. As all tasks > read consistent data, we can produce result correctly, regardless of how the > retried of Stage 1 Task 2 would partition the data. > > !image-2025-04-09-10-22-42-222.png|width=670,height=273! > > *Proposal:* > To avoid correctness issues produce by non-deterministic stage with partial > retry, we propose an approach which first try to detect inconsistent data > that might be generated by different task attempts of a non-deterministic > stage. For example, whether all the data partitions generated by Task 2 in > the first attempt are the same as the all the data partitions generated by > the second attempt. We retry the entire consumer stages if inconsistent data > is detected. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org