On Sat, 20 Apr 2024 at 23:36, Dylan McClelland <dmcclell...@indeed.com.invalid> wrote:
> when describing the resilience of the magic committer to failures during a > task commit, the docs state > > "If the .pendingset file has been saved to the job attempt directory, the > task has effectively committed, it has just failed to report to the > controller. This will cause complications during job commit, as there may > be two task PendingSet committing the same files, or committing files with > last big review/writeup of the protocol was in https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17 when we commit a task the .pendingset file is saved with task id, not task attempt id. two task attempts for the same task cannot be committed; the latest one to commit wins. That is: task ID is implicit in the filename. There's still the risk of partitioning where TA1 is told to commit, but goes offline, TA2 commits and then, later TA1 completes. Then when the job commits, it gets the files of TA1 even though TA2 is the one which reported success. This "fail if task commit doesn't respond/report success" should only be needed for commit protocols whose task commit is non-atomic/non-repeatable. > *Proposed*: track task ID in pendingsets, recognise duplicates on load and > then respond by cancelling one set and committing the other. (or fail?)" > task id is implicit in filename: it's ${taskid}.pendingset. we can't guarantee which of two TAs told to commit it is, except that the last one told to execute by spark will overwrite any file it found and reported success. > > As far as I can tell from reading over the code, the proposal was not > implemented. Is this still considered a viable solution? If so, I'd be > happy to take a crack at it > > For context: We are running spark jobs using the magic committer in a k8s > environment where executor loss is somewhat common (maybe about 2% of > executors are terminated by the cluster). By default, spark's > OutputCommitCoordinator fails the whole write stage (and parent job) if an > executor fails while it an "authorized committer" (see > https://issues.apache.org/jira/browse/SPARK-39195). > > This results in expensive job failures at the final write stage. We can > avoid those failures by disabling the outputCommitCoordinator entirely, but > that leaves us open to the failure mode described above in the s3a docs. > I'm not sure you are . > > It seems to me that if the magic committer implemented the above proposal > to track task ids in the pendingset, the OutputCommitCoordinator would no > longer be needed at all. I think this ticket ( > https://issues.apache.org/jira/browse/SPARK-19790) discusses similar ideas > about how to make spark's commit coordiantor work with the s3a committers > Task attempt failure during commit is a troublespot. Task commits should be repeatable, so that if task attempt TA1 fails *or fails to report an outcome* then another attempt TA2 must succeed. FileOutputCommitter v1, S3A and abfs/gcs manifest committers all meet this guarantee; those which aren't are * everywhere: v2 file output committer * gcs: v1 commit algorithm (which requires atomic dir rename) * I believe: AWS EMR optimised committer. I think spark is being over-aggressive and that you can disable the co-ordinator. Syed Shameerur Rahman has been doing work on the committer with a goal of caching state in memory rather than s3 and passing it directly to the driver rather than via files. This would completely eliminate the issue, as only the successful task attempts can return the data.