## concurrent checkpoints

@Aljoscha Krettek <aljos...@apache.org> regarding the concurrent
checkpoints, let me illustrate with a simple DAG below.
[image: image.png]

Let's assume each writer emits one file per checkpoint cycle and *writer-2
is slow*. Now let's look at what the global committer receives

timeline:
----------------------------------------------------------> Now
from Writer-1:      file-1-1, ck-1, file-1-2, ck-2
from Writer-2:
file-2-1, ck-1

In this case, the committer shouldn't include "file-1-2" into the commit
for ck-1.

## Committable bookkeeping and combining

I like David's proposal where the framework takes care of the
bookkeeping of committables and provides a combiner API (CommT ->
GlobalCommT) for GlobalCommitter. The only requirement is to tie the
commit/CommT/GlobalCommT to a checkpoint.

When a commit is successful for checkpoint-N, the framework needs to remove
the GlobalCommT from the state corresponding to checkpoints <= N. If a
commit fails, the GlobalCommT accumulates and will be included in the next
cycle. That is how the Iceberg sink works. I think it is good to piggyback
retries with Flink's periodic checkpoints for Iceberg sink. Otherwise, it
can get complicated to implement retry logic that won't interfere with
Flink checkpoints.

The main question is if this pattern is generic to be put into the sink
framework or not.

> A alternative topology option for the IcebergSink might be :
DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.

@Guowei Ma <guowei....@gmail.com> I would favor David's suggestion of
"combine" API rather than a separate "Agg" operator.

## Using checkpointId

> I think this can have some problems, for example when checkpoint ids are
not strictly sequential, when we wrap around, or when the JobID changes.
This will happen when doing a stop/start-from-savepoint cycle, for example.

checkpointId can work if it is monotonically increasing, which I believe is
the case for Flink today. Restoring from checkpoint or savepoint will
resume the checkpointIds.

We can deal with JobID change by saving it into the state and Iceberg
snapshot metadata. There is already a PR [1] for that.

## Nonce

> Flink provide a nonce to the GlobalCommitter where Flink guarantees that
this nonce is unique

That is actually how we implemented internally. Flink Iceberg sink
basically hashes the Manifest file location as the nonce. Since the Flink
generated Manifest file location is unique, it  guarantees the nonce is
unique.

IMO, checkpointId is also one way of implementing Nonce based on today's
Flink behavior.

> and will not change for repeated invocations of the GlobalCommitter with
the same set of committables

 if the same set of committables are combined into one GlobalCommT (like
ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT
interface.

BTW, as David pointed out, the ManifestFile optimization is only in our
internal implementation [2] right now. For the open source version, there
is a github issue [3] to track follow-up improvements.

Thanks,
Steven

[1] https://github.com/apache/iceberg/pull/1404
[2]
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L363
[3] https://github.com/apache/iceberg/issues/1403


On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi all,
>
>
> Very thanks for the discussion and the valuable opinions! Currently there
> are several ongoing issues and we would like to show what we are thinking
> in the next few mails.
>
> It seems that the biggest issue now is about the topology of the sinks.
> Before deciding what the sink API would look like, I would like to first
> summarize the different topologies we have mentioned so that we could sync
> on the same page and gain more insights about this issue. There are four
> types of topology I could see. Please correct me if I misunderstand what
> you mean:
>
>    1.
>
>    Commit individual files. (StreamingFileSink)
>    1.
>
>       FileWriter -> FileCommitter
>       2.
>
>    Commit a directory (HiveSink)
>    1.
>
>       FileWriter -> FileCommitter -> GlobalCommitter
>       3.
>
>    Commit a bundle of files (Iceberg)
>    1.
>
>       DataFileWriter  -> GlobalCommitter
>       4.
>
>    Commit a directory with merged files(Some user want to merge the files
>    in a directory before committing the directory to Hive meta store)
>    1.
>
>       FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
>
>
> It can be seen from the above that the topologies are different according
> to different requirements. Not only that there may be other options for the
> second and third categories. E.g
>
> A alternative topology option for the IcebergSink might be : DataFileWriter
> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
> take care of the cleanup instead of coupling the cleanup logic to the
> committer.
>
>
> In the long run I think we might provide the sink developer the ability to
> build arbitrary topologies. Maybe Flink could only provide a basic commit
> transformation and let the user build other parts of the topology. In the
> 1.12 we might first provide different patterns for these different
> scenarios at first and I think these components could be reused in the
> future.
>
> Best,
> Guowei
>
>
> On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
> > Hi all,
> >
> > > I would think that we only need flush() and the semantics are that it
> > > prepares for a commit, so on a physical level it would be called from
> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > > think flush() should be renamed to something like "prepareCommit()".
> >
> > Generally speaking it is a good point that emitting the committables
> > should happen before emitting the checkpoint barrier downstream.
> > However, if I remember offline discussions well, the idea behind
> > Writer#flush and Writer#snapshotState was to differentiate commit on
> > checkpoint vs final checkpoint at the end of the job. Both of these
> > methods could emit committables, but the flush should not leave any in
> > progress state (e.g. in case of file sink in STREAM mode, in
> > snapshotState it could leave some open files that would be committed in
> > a subsequent cycle, however flush should close all files). The
> > snapshotState as it is now can not be called in
> > prepareSnapshotPreBarrier as it can store some state, which should
> > happen in Operator#snapshotState as otherwise it would always be
> > synchronous. Therefore I think we would need sth like:
> >
> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
> >
> > ver 1:
> >
> > List<StateT> snapshotState();
> >
> > ver 2:
> >
> > void snapshotState(); // not sure if we need that method at all in
> option 2
> >
> > > The Committer is as described in the FLIP, it's basically a function
> > > "void commit(Committable)". The GobalCommitter would be a function
> "void
> > > commit(List<Committable>)". The former would be used by an S3 sink
> where
> > > we can individually commit files to S3, a committable would be the list
> > > of part uploads that will form the final file and the commit operation
> > > creates the metadata in S3. The latter would be used by something like
> > > Iceberg where the Committer needs a global view of all the commits to
> be
> > > efficient and not overwhelm the system.
> > >
> > > I don't know yet if sinks would only implement on type of commit
> > > function or potentially both at the same time, and maybe Commit can
> > > return some CommitResult that gets shipped to the GlobalCommit
> function.
> > I must admit it I did not get the need for Local/Normal + Global
> > committer at first. The Iceberg example helped a lot. I think it makes a
> > lot of sense.
> >
> > > For Iceberg, writers don't need any state. But the GlobalCommitter
> > > needs to
> > > checkpoint StateT. For the committer, CommT is "DataFile". Since a
> single
> > > committer can collect thousands (or more) data files in one checkpoint
> > > cycle, as an optimization we checkpoint a single "ManifestFile" (for
> the
> > > collected thousands data files) as StateT. This allows us to absorb
> > > extended commit outages without losing written/uploaded data files, as
> > > operator state size is as small as one manifest file per checkpoint
> cycle
> > > [2].
> > > ------------------
> > > StateT snapshotState(SnapshotContext context) throws Exception;
> > >
> > > That means we also need the restoreCommitter API in the Sink interface
> > > ---------------
> > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
> > > state);
> > I think this might be a valid case. Not sure though if I would go with a
> > "state" there. Having a state in a committer would imply we need a
> > collect method as well. So far we needed a single method commit(...) and
> > the bookkeeping of the committables could be handled by the framework. I
> > think something like an optional combiner in the GlobalCommitter would
> > be enough. What do you think?
> >
> > GlobalCommitter<CommT, GlobalCommT> {
> >
> >     void commit(GlobalCommT globalCommittables);
> >
> >     GlobalCommT combine(List<CommT> committables);
> >
> > }
> >
> > A different problem that I see here is how do we handle commit failures.
> > Should the committables (both normal and global be included in the next
> > cycle, shall we retry it, ...) I think it would be worth laying it out
> > in the FLIP.
> >
> > @Aljoscha I think you can find the code Steven was referring in here:
> >
> >
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
> >
> > Best,
> >
> > Dawid
> >
> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
> > > On 14.09.20 01:23, Steven Wu wrote:
> > >> ## Writer interface
> > >>
> > >> For the Writer interface, should we add "*prepareSnapshot"* before the
> > >> checkpoint barrier emitted downstream?  IcebergWriter would need it.
> Or
> > >> would the framework call "*flush*" before the barrier emitted
> > >> downstream?
> > >> that guarantee would achieve the same goal.
> > >
> > > I would think that we only need flush() and the semantics are that it
> > > prepares for a commit, so on a physical level it would be called from
> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > > think flush() should be renamed to something like "prepareCommit()".
> > >
> > > @Guowei, what do you think about this?
> > >
> > >> In [1], we discussed the reason for Writer to emit (checkpointId,
> CommT)
> > >> tuple to the committer. The committer needs checkpointId to separate
> out
> > >> data files for different checkpoints if concurrent checkpoints are
> > >> enabled.
> > >
> > > When can this happen? Even with concurrent checkpoints the snapshot
> > > barriers would still cleanly segregate the input stream of an operator
> > > into tranches that should manifest in only one checkpoint. With
> > > concurrent checkpoints, all that can happen is that we start a
> > > checkpoint before a last one is confirmed completed.
> > >
> > > Unless there is some weirdness in the sources and some sources start
> > > chk1 first and some other ones start chk2 first?
> > >
> > > @Piotrek, do you think this is a problem?
> > >
> > >> For the Committer interface, I am wondering if we should split the
> > >> single
> > >> commit method into separate "*collect"* and "*commit"* methods? This
> > >> way,
> > >> it can handle both single and multiple CommT objects.
> > >
> > > I think we can't do this. If the sink only needs a regular Commiter,
> > > we can perform the commits in parallel, possibly on different
> > > machines. Only when the sink needs a GlobalCommitter would we need to
> > > ship all commits to a single process and perform the commit there. If
> > > both methods were unified in one interface we couldn't make the
> > > decision of were to commit in the framework code.
> > >
> > >> For Iceberg, writers don't need any state. But the GlobalCommitter
> > >> needs to
> > >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
> > >> single
> > >> committer can collect thousands (or more) data files in one checkpoint
> > >> cycle, as an optimization we checkpoint a single "ManifestFile" (for
> the
> > >> collected thousands data files) as StateT. This allows us to absorb
> > >> extended commit outages without losing written/uploaded data files, as
> > >> operator state size is as small as one manifest file per checkpoint
> > >> cycle
> > >
> > > You could have a point here. Is the code for this available in
> > > open-source? I was checking out
> > >
> >
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> > > and didn't find the ManifestFile optimization there.
> > >
> > > Best,
> > > Aljoscha
> > >
> >
> >
>

Reply via email to