Hi, aljoscha

>I don't understand why we need the "Drain and Snapshot" section. It
>seems to be some details about stop-with-savepoint and drain, and the
>relation to BATCH execution but I don't know if it is needed to
>understand the rest of the document. I'm happy to be wrong here, though,
>if there's good reasons for the section.

The new unified sink API should provide a way for the sink developer to
deal with EOI(Drain) to guarantee the Exactly-once semantics. This is what
I want to say mostly in this section. Current streaming style sink API does
not provide a good way to deal with it. It is why the `StreamingFileSink`
does not commit the last part of data in the bounded scenario. Our theme is
unified. I am afraid that I will let users misunderstand that adding this
requirement to the new sink API is only for bounded scenarios, so I
explained in this paragraph that stop-with-savepoint might also have the
similar requirement.

For the snapshot I also want to prevent users from misunderstanding that it
is specially prepared for the unbounded scenario. Actually it might be also
possible with bounded + batch execution mode in the future.

I could reorganize the section if this section makes the reader confused
but I think we might need to keep the drain at least. WDYT?

>On the question of Alternative 1 and 2, I have a strong preference for
>Alternative 1 because we could avoid strong coupling to other modules.
>With Alternative 2 we would depend on `flink-streaming-java` and even
>`flink-runtime`. For the new source API (FLIP-27) we managed to keep the
>dependencies slim and the code is in flink-core. I'd be very happy if we
>can manage the same for the new sink API.

I am open to alternative 1. Maybe I miss something but I do not get why the
second alternative would depend on `flink-runtime` or
`flink-streaming-java`. The all the state api currently is in the
flink-core. Could you give some further explanation?  thanks :)

Best,
Guowei


On Tue, Sep 15, 2020 at 12:05 PM Guowei Ma <guowei....@gmail.com> wrote:

> ## Concurrent checkpoints
> AFAIK the committer would not see the file-1-2 when ck1 happens in the
> ExactlyOnce mode.
>
> ## Committable bookkeeping and combining
>
> I agree with you that the "CombineGlobalCommitter" would work. But we put
> more optimization logic in the committer, which will make the committer
> more and more complicated, and eventually become the same as the
> Writer. For example, The committer needs to clean up some unused manifest
> file when restoring from a failure if we introduce the optimizations to the
> committer.
>
> In this case another alternative might be to put this "merging"
> optimization to a separate agg operator(maybe just like another `Writer`?).
> The agg could produce an aggregated committable to the committer. The agg
> operator could manage the whole life cycle of the manifest file it created.
> It would make the committer have single responsibility.
>
> >>The main question is if this pattern is generic to be put into the sink
> framework or not.
> Maybe I am wrong. But what I can feel from the current discussion is that
> different requirements have different topological requirements.
>
> ## Using checkpointId
> In the batch execution mode there would be no normal checkpoint any more.
> That is why we do not introduce the checkpoint id in the API. So it is a
> great thing that sink decouples its implementation from checkpointid. :)
>
> Best,
> Guowei
>
>
> On Tue, Sep 15, 2020 at 7:33 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>>
>> ## concurrent checkpoints
>>
>> @Aljoscha Krettek <aljos...@apache.org> regarding the concurrent
>> checkpoints, let me illustrate with a simple DAG below.
>> [image: image.png]
>>
>> Let's assume each writer emits one file per checkpoint cycle and *writer-2
>> is slow*. Now let's look at what the global committer receives
>>
>> timeline:
>> ----------------------------------------------------------> Now
>> from Writer-1:      file-1-1, ck-1, file-1-2, ck-2
>> from Writer-2:
>> file-2-1, ck-1
>>
>> In this case, the committer shouldn't include "file-1-2" into the commit
>> for ck-1.
>>
>> ## Committable bookkeeping and combining
>>
>> I like David's proposal where the framework takes care of the
>> bookkeeping of committables and provides a combiner API (CommT ->
>> GlobalCommT) for GlobalCommitter. The only requirement is to tie the
>> commit/CommT/GlobalCommT to a checkpoint.
>>
>> When a commit is successful for checkpoint-N, the framework needs to
>> remove the GlobalCommT from the state corresponding to checkpoints <= N. If
>> a commit fails, the GlobalCommT accumulates and will be included in the
>> next cycle. That is how the Iceberg sink works. I think it is good to
>> piggyback retries with Flink's periodic checkpoints for Iceberg sink.
>> Otherwise, it can get complicated to implement retry logic that won't
>> interfere with Flink checkpoints.
>>
>> The main question is if this pattern is generic to be put into the sink
>> framework or not.
>>
>> > A alternative topology option for the IcebergSink might be :
>> DataFileWriter
>> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
>> take care of the cleanup instead of coupling the cleanup logic to the
>> committer.
>>
>> @Guowei Ma <guowei....@gmail.com> I would favor David's suggestion of
>> "combine" API rather than a separate "Agg" operator.
>>
>> ## Using checkpointId
>>
>> > I think this can have some problems, for example when checkpoint ids are
>> not strictly sequential, when we wrap around, or when the JobID changes.
>> This will happen when doing a stop/start-from-savepoint cycle, for
>> example.
>>
>> checkpointId can work if it is monotonically increasing, which I believe
>> is the case for Flink today. Restoring from checkpoint or savepoint will
>> resume the checkpointIds.
>>
>> We can deal with JobID change by saving it into the state and Iceberg
>> snapshot metadata. There is already a PR [1] for that.
>>
>> ## Nonce
>>
>> > Flink provide a nonce to the GlobalCommitter where Flink guarantees
>> that this nonce is unique
>>
>> That is actually how we implemented internally. Flink Iceberg sink
>> basically hashes the Manifest file location as the nonce. Since the Flink
>> generated Manifest file location is unique, it  guarantees the nonce is
>> unique.
>>
>> IMO, checkpointId is also one way of implementing Nonce based on today's
>> Flink behavior.
>>
>> > and will not change for repeated invocations of the GlobalCommitter
>> with the same set of committables
>>
>>  if the same set of committables are combined into one GlobalCommT (like
>> ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT
>> interface.
>>
>> BTW, as David pointed out, the ManifestFile optimization is only in our
>> internal implementation [2] right now. For the open source version, there
>> is a github issue [3] to track follow-up improvements.
>>
>> Thanks,
>> Steven
>>
>> [1] https://github.com/apache/iceberg/pull/1404
>> [2]
>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L363
>> [3] https://github.com/apache/iceberg/issues/1403
>>
>>
>> On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>>
>>> Very thanks for the discussion and the valuable opinions! Currently there
>>> are several ongoing issues and we would like to show what we are thinking
>>> in the next few mails.
>>>
>>> It seems that the biggest issue now is about the topology of the sinks.
>>> Before deciding what the sink API would look like, I would like to first
>>> summarize the different topologies we have mentioned so that we could
>>> sync
>>> on the same page and gain more insights about this issue. There are four
>>> types of topology I could see. Please correct me if I misunderstand what
>>> you mean:
>>>
>>>    1.
>>>
>>>    Commit individual files. (StreamingFileSink)
>>>    1.
>>>
>>>       FileWriter -> FileCommitter
>>>       2.
>>>
>>>    Commit a directory (HiveSink)
>>>    1.
>>>
>>>       FileWriter -> FileCommitter -> GlobalCommitter
>>>       3.
>>>
>>>    Commit a bundle of files (Iceberg)
>>>    1.
>>>
>>>       DataFileWriter  -> GlobalCommitter
>>>       4.
>>>
>>>    Commit a directory with merged files(Some user want to merge the files
>>>    in a directory before committing the directory to Hive meta store)
>>>    1.
>>>
>>>       FileWriter -> SingleFileCommit -> FileMergeWriter  ->
>>> GlobalCommitter
>>>
>>>
>>> It can be seen from the above that the topologies are different according
>>> to different requirements. Not only that there may be other options for
>>> the
>>> second and third categories. E.g
>>>
>>> A alternative topology option for the IcebergSink might be :
>>> DataFileWriter
>>> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
>>> take care of the cleanup instead of coupling the cleanup logic to the
>>> committer.
>>>
>>>
>>> In the long run I think we might provide the sink developer the ability
>>> to
>>> build arbitrary topologies. Maybe Flink could only provide a basic commit
>>> transformation and let the user build other parts of the topology. In the
>>> 1.12 we might first provide different patterns for these different
>>> scenarios at first and I think these components could be reused in the
>>> future.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
>>> dwysakow...@apache.org>
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > > I would think that we only need flush() and the semantics are that it
>>> > > prepares for a commit, so on a physical level it would be called from
>>> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>>> > > think flush() should be renamed to something like "prepareCommit()".
>>> >
>>> > Generally speaking it is a good point that emitting the committables
>>> > should happen before emitting the checkpoint barrier downstream.
>>> > However, if I remember offline discussions well, the idea behind
>>> > Writer#flush and Writer#snapshotState was to differentiate commit on
>>> > checkpoint vs final checkpoint at the end of the job. Both of these
>>> > methods could emit committables, but the flush should not leave any in
>>> > progress state (e.g. in case of file sink in STREAM mode, in
>>> > snapshotState it could leave some open files that would be committed in
>>> > a subsequent cycle, however flush should close all files). The
>>> > snapshotState as it is now can not be called in
>>> > prepareSnapshotPreBarrier as it can store some state, which should
>>> > happen in Operator#snapshotState as otherwise it would always be
>>> > synchronous. Therefore I think we would need sth like:
>>> >
>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>>> >
>>> > ver 1:
>>> >
>>> > List<StateT> snapshotState();
>>> >
>>> > ver 2:
>>> >
>>> > void snapshotState(); // not sure if we need that method at all in
>>> option 2
>>> >
>>> > > The Committer is as described in the FLIP, it's basically a function
>>> > > "void commit(Committable)". The GobalCommitter would be a function
>>> "void
>>> > > commit(List<Committable>)". The former would be used by an S3 sink
>>> where
>>> > > we can individually commit files to S3, a committable would be the
>>> list
>>> > > of part uploads that will form the final file and the commit
>>> operation
>>> > > creates the metadata in S3. The latter would be used by something
>>> like
>>> > > Iceberg where the Committer needs a global view of all the commits
>>> to be
>>> > > efficient and not overwhelm the system.
>>> > >
>>> > > I don't know yet if sinks would only implement on type of commit
>>> > > function or potentially both at the same time, and maybe Commit can
>>> > > return some CommitResult that gets shipped to the GlobalCommit
>>> function.
>>> > I must admit it I did not get the need for Local/Normal + Global
>>> > committer at first. The Iceberg example helped a lot. I think it makes
>>> a
>>> > lot of sense.
>>> >
>>> > > For Iceberg, writers don't need any state. But the GlobalCommitter
>>> > > needs to
>>> > > checkpoint StateT. For the committer, CommT is "DataFile". Since a
>>> single
>>> > > committer can collect thousands (or more) data files in one
>>> checkpoint
>>> > > cycle, as an optimization we checkpoint a single "ManifestFile" (for
>>> the
>>> > > collected thousands data files) as StateT. This allows us to absorb
>>> > > extended commit outages without losing written/uploaded data files,
>>> as
>>> > > operator state size is as small as one manifest file per checkpoint
>>> cycle
>>> > > [2].
>>> > > ------------------
>>> > > StateT snapshotState(SnapshotContext context) throws Exception;
>>> > >
>>> > > That means we also need the restoreCommitter API in the Sink
>>> interface
>>> > > ---------------
>>> > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT
>>> > > state);
>>> > I think this might be a valid case. Not sure though if I would go with
>>> a
>>> > "state" there. Having a state in a committer would imply we need a
>>> > collect method as well. So far we needed a single method commit(...)
>>> and
>>> > the bookkeeping of the committables could be handled by the framework.
>>> I
>>> > think something like an optional combiner in the GlobalCommitter would
>>> > be enough. What do you think?
>>> >
>>> > GlobalCommitter<CommT, GlobalCommT> {
>>> >
>>> >     void commit(GlobalCommT globalCommittables);
>>> >
>>> >     GlobalCommT combine(List<CommT> committables);
>>> >
>>> > }
>>> >
>>> > A different problem that I see here is how do we handle commit
>>> failures.
>>> > Should the committables (both normal and global be included in the next
>>> > cycle, shall we retry it, ...) I think it would be worth laying it out
>>> > in the FLIP.
>>> >
>>> > @Aljoscha I think you can find the code Steven was referring in here:
>>> >
>>> >
>>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> >
>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
>>> > > On 14.09.20 01:23, Steven Wu wrote:
>>> > >> ## Writer interface
>>> > >>
>>> > >> For the Writer interface, should we add "*prepareSnapshot"* before
>>> the
>>> > >> checkpoint barrier emitted downstream?  IcebergWriter would need
>>> it. Or
>>> > >> would the framework call "*flush*" before the barrier emitted
>>> > >> downstream?
>>> > >> that guarantee would achieve the same goal.
>>> > >
>>> > > I would think that we only need flush() and the semantics are that it
>>> > > prepares for a commit, so on a physical level it would be called from
>>> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>>> > > think flush() should be renamed to something like "prepareCommit()".
>>> > >
>>> > > @Guowei, what do you think about this?
>>> > >
>>> > >> In [1], we discussed the reason for Writer to emit (checkpointId,
>>> CommT)
>>> > >> tuple to the committer. The committer needs checkpointId to
>>> separate out
>>> > >> data files for different checkpoints if concurrent checkpoints are
>>> > >> enabled.
>>> > >
>>> > > When can this happen? Even with concurrent checkpoints the snapshot
>>> > > barriers would still cleanly segregate the input stream of an
>>> operator
>>> > > into tranches that should manifest in only one checkpoint. With
>>> > > concurrent checkpoints, all that can happen is that we start a
>>> > > checkpoint before a last one is confirmed completed.
>>> > >
>>> > > Unless there is some weirdness in the sources and some sources start
>>> > > chk1 first and some other ones start chk2 first?
>>> > >
>>> > > @Piotrek, do you think this is a problem?
>>> > >
>>> > >> For the Committer interface, I am wondering if we should split the
>>> > >> single
>>> > >> commit method into separate "*collect"* and "*commit"* methods? This
>>> > >> way,
>>> > >> it can handle both single and multiple CommT objects.
>>> > >
>>> > > I think we can't do this. If the sink only needs a regular Commiter,
>>> > > we can perform the commits in parallel, possibly on different
>>> > > machines. Only when the sink needs a GlobalCommitter would we need to
>>> > > ship all commits to a single process and perform the commit there. If
>>> > > both methods were unified in one interface we couldn't make the
>>> > > decision of were to commit in the framework code.
>>> > >
>>> > >> For Iceberg, writers don't need any state. But the GlobalCommitter
>>> > >> needs to
>>> > >> checkpoint StateT. For the committer, CommT is "DataFile". Since a
>>> > >> single
>>> > >> committer can collect thousands (or more) data files in one
>>> checkpoint
>>> > >> cycle, as an optimization we checkpoint a single "ManifestFile"
>>> (for the
>>> > >> collected thousands data files) as StateT. This allows us to absorb
>>> > >> extended commit outages without losing written/uploaded data files,
>>> as
>>> > >> operator state size is as small as one manifest file per checkpoint
>>> > >> cycle
>>> > >
>>> > > You could have a point here. Is the code for this available in
>>> > > open-source? I was checking out
>>> > >
>>> >
>>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
>>> > > and didn't find the ManifestFile optimization there.
>>> > >
>>> > > Best,
>>> > > Aljoscha
>>> > >
>>> >
>>> >
>>>
>>

Reply via email to