i can see the value in their change "ensures that a job fails when the
committer doesn't do atomic task commit",

But as i believe committers should do exactly that, always, it should
generally be the default.

really they should be able to ask the committer if it has atomic task
commit.

if spark used the existing StreamCapabilities.hasCapability() then we could
have the hadoop bundled committers return true for some probe "atomic task
commit", which the s3a, manifest and v1 file output committer would do
(though there it should check to see if the fs url isn't gcs, s3, s3a,
wasb, ...)



On Wed, 24 Apr 2024 at 00:49, Dylan McClelland
<dmcclell...@indeed.com.invalid> wrote:

> thanks for clarifying that, now that you have pointed me in the right
> direction I can see in the code how that works
>
> I will check with the spark team to see if they are open to either
> modifying outputCommitCoordinator to avoid the authorized commiter check
> when using the magic committer, or promote the undocumented
> "spark.hadoop.outputCommitCoordination.enabled" setting to a fully
> supported option
>
> thanks again
>
> On Tue, Apr 23, 2024 at 8:49 AM Steve Loughran <ste...@cloudera.com.invalid
> >
> wrote:
>
> > On Sat, 20 Apr 2024 at 23:36, Dylan McClelland
> > <dmcclell...@indeed.com.invalid> wrote:
> >
> > > when describing the resilience of the magic committer to failures
> during
> > a
> > > task commit, the docs state
> > >
> > > "If the .pendingset file has been saved to the job attempt directory,
> the
> > > task has effectively committed, it has just failed to report to the
> > > controller. This will cause complications during job commit, as there
> may
> > > be two task PendingSet committing the same files, or committing files
> > with
> > >
> >
> >
> > last big review/writeup of the protocol was in
> >
> >
> https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_release_2021-05-17
> >
> > when we commit a task the .pendingset file is saved with task id, not
> task
> > attempt id.
> > two task attempts for the same task cannot be committed; the latest one
> to
> > commit wins.
> >
> > That is: task ID is implicit in the filename.
> >
> > There's still the risk of partitioning where TA1 is told to commit, but
> > goes offline, TA2 commits and then, later TA1 completes.
> > Then when the job commits, it gets the files of TA1 even though TA2 is
> the
> > one which reported success.
> >
> >
> > This "fail if task commit doesn't respond/report success" should only be
> > needed for commit protocols whose task commit is
> non-atomic/non-repeatable.
> >
> >
> > > *Proposed*: track task ID in pendingsets, recognise duplicates on load
> > and
> > > then respond by cancelling one set and committing the other. (or
> fail?)"
> > >
> >
> > task id is implicit in filename: it's ${taskid}.pendingset.
> >
> > we can't guarantee which of two TAs told to commit it is, except that the
> > last one told to execute by spark will overwrite any file it found and
> > reported success.
> >
> >
> > >
> > > As far as I can tell from reading over the code, the proposal was not
> > > implemented. Is this still considered a viable solution? If so, I'd be
> > > happy to take a crack at it
> > >
> > > For context: We are running spark jobs using the magic committer in a
> k8s
> > > environment where executor loss is somewhat common (maybe about 2% of
> > > executors are terminated by the cluster). By default, spark's
> > > OutputCommitCoordinator fails the whole write stage (and parent job) if
> > an
> > > executor fails while it an "authorized committer" (see
> > > https://issues.apache.org/jira/browse/SPARK-39195).
> > >
> > > This results in expensive job failures at the final write stage. We can
> > > avoid those failures by disabling the outputCommitCoordinator entirely,
> > but
> > > that leaves us open to the failure mode described above in the s3a
> docs.
> > >
> >
> > I'm not sure you are .
> >
> > >
> > > It seems to me that if the magic committer implemented the above
> proposal
> > > to track task ids in the pendingset, the OutputCommitCoordinator would
> no
> > > longer be needed at all. I think this ticket (
> > > https://issues.apache.org/jira/browse/SPARK-19790) discusses similar
> > ideas
> > > about how to make spark's commit coordiantor work with the s3a
> committers
> > >
> >
> >
> > Task attempt failure during commit is a troublespot. Task commits should
> be
> > repeatable, so that if task attempt TA1 fails *or fails to report an
> > outcome* then another attempt TA2 must succeed.
> >
> > FileOutputCommitter v1, S3A and abfs/gcs manifest committers all meet
> this
> > guarantee; those which aren't are
> > * everywhere: v2 file output committer
> > * gcs: v1 commit algorithm (which requires atomic dir rename)
> > * I believe: AWS EMR optimised committer.
> >
> > I think spark is being over-aggressive and that you can disable the
> > co-ordinator.
> >
> >
> > Syed Shameerur Rahman has been doing work on the committer with a goal of
> > caching state in memory rather than s3 and passing it directly to the
> > driver rather than via files. This would completely eliminate the issue,
> as
> > only the successful task attempts can return the data.
> >
>

Reply via email to