Hi, aljoscha >I don't understand why we need the "Drain and Snapshot" section. It >seems to be some details about stop-with-savepoint and drain, and the >relation to BATCH execution but I don't know if it is needed to >understand the rest of the document. I'm happy to be wrong here, though, >if there's good reasons for the section.
The new unified sink API should provide a way for the sink developer to deal with EOI(Drain) to guarantee the Exactly-once semantics. This is what I want to say mostly in this section. Current streaming style sink API does not provide a good way to deal with it. It is why the `StreamingFileSink` does not commit the last part of data in the bounded scenario. Our theme is unified. I am afraid that I will let users misunderstand that adding this requirement to the new sink API is only for bounded scenarios, so I explained in this paragraph that stop-with-savepoint might also have the similar requirement. For the snapshot I also want to prevent users from misunderstanding that it is specially prepared for the unbounded scenario. Actually it might be also possible with bounded + batch execution mode in the future. I could reorganize the section if this section makes the reader confused but I think we might need to keep the drain at least. WDYT? >On the question of Alternative 1 and 2, I have a strong preference for >Alternative 1 because we could avoid strong coupling to other modules. >With Alternative 2 we would depend on `flink-streaming-java` and even >`flink-runtime`. For the new source API (FLIP-27) we managed to keep the >dependencies slim and the code is in flink-core. I'd be very happy if we >can manage the same for the new sink API. I am open to alternative 1. Maybe I miss something but I do not get why the second alternative would depend on `flink-runtime` or `flink-streaming-java`. The all the state api currently is in the flink-core. Could you give some further explanation? thanks :) Best, Guowei On Tue, Sep 15, 2020 at 12:05 PM Guowei Ma <guowei....@gmail.com> wrote: > ## Concurrent checkpoints > AFAIK the committer would not see the file-1-2 when ck1 happens in the > ExactlyOnce mode. > > ## Committable bookkeeping and combining > > I agree with you that the "CombineGlobalCommitter" would work. But we put > more optimization logic in the committer, which will make the committer > more and more complicated, and eventually become the same as the > Writer. For example, The committer needs to clean up some unused manifest > file when restoring from a failure if we introduce the optimizations to the > committer. > > In this case another alternative might be to put this "merging" > optimization to a separate agg operator(maybe just like another `Writer`?). > The agg could produce an aggregated committable to the committer. The agg > operator could manage the whole life cycle of the manifest file it created. > It would make the committer have single responsibility. > > >>The main question is if this pattern is generic to be put into the sink > framework or not. > Maybe I am wrong. But what I can feel from the current discussion is that > different requirements have different topological requirements. > > ## Using checkpointId > In the batch execution mode there would be no normal checkpoint any more. > That is why we do not introduce the checkpoint id in the API. So it is a > great thing that sink decouples its implementation from checkpointid. :) > > Best, > Guowei > > > On Tue, Sep 15, 2020 at 7:33 AM Steven Wu <stevenz...@gmail.com> wrote: > >> >> ## concurrent checkpoints >> >> @Aljoscha Krettek <aljos...@apache.org> regarding the concurrent >> checkpoints, let me illustrate with a simple DAG below. >> [image: image.png] >> >> Let's assume each writer emits one file per checkpoint cycle and *writer-2 >> is slow*. Now let's look at what the global committer receives >> >> timeline: >> ----------------------------------------------------------> Now >> from Writer-1: file-1-1, ck-1, file-1-2, ck-2 >> from Writer-2: >> file-2-1, ck-1 >> >> In this case, the committer shouldn't include "file-1-2" into the commit >> for ck-1. >> >> ## Committable bookkeeping and combining >> >> I like David's proposal where the framework takes care of the >> bookkeeping of committables and provides a combiner API (CommT -> >> GlobalCommT) for GlobalCommitter. The only requirement is to tie the >> commit/CommT/GlobalCommT to a checkpoint. >> >> When a commit is successful for checkpoint-N, the framework needs to >> remove the GlobalCommT from the state corresponding to checkpoints <= N. If >> a commit fails, the GlobalCommT accumulates and will be included in the >> next cycle. That is how the Iceberg sink works. I think it is good to >> piggyback retries with Flink's periodic checkpoints for Iceberg sink. >> Otherwise, it can get complicated to implement retry logic that won't >> interfere with Flink checkpoints. >> >> The main question is if this pattern is generic to be put into the sink >> framework or not. >> >> > A alternative topology option for the IcebergSink might be : >> DataFileWriter >> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg >> take care of the cleanup instead of coupling the cleanup logic to the >> committer. >> >> @Guowei Ma <guowei....@gmail.com> I would favor David's suggestion of >> "combine" API rather than a separate "Agg" operator. >> >> ## Using checkpointId >> >> > I think this can have some problems, for example when checkpoint ids are >> not strictly sequential, when we wrap around, or when the JobID changes. >> This will happen when doing a stop/start-from-savepoint cycle, for >> example. >> >> checkpointId can work if it is monotonically increasing, which I believe >> is the case for Flink today. Restoring from checkpoint or savepoint will >> resume the checkpointIds. >> >> We can deal with JobID change by saving it into the state and Iceberg >> snapshot metadata. There is already a PR [1] for that. >> >> ## Nonce >> >> > Flink provide a nonce to the GlobalCommitter where Flink guarantees >> that this nonce is unique >> >> That is actually how we implemented internally. Flink Iceberg sink >> basically hashes the Manifest file location as the nonce. Since the Flink >> generated Manifest file location is unique, it guarantees the nonce is >> unique. >> >> IMO, checkpointId is also one way of implementing Nonce based on today's >> Flink behavior. >> >> > and will not change for repeated invocations of the GlobalCommitter >> with the same set of committables >> >> if the same set of committables are combined into one GlobalCommT (like >> ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT >> interface. >> >> BTW, as David pointed out, the ManifestFile optimization is only in our >> internal implementation [2] right now. For the open source version, there >> is a github issue [3] to track follow-up improvements. >> >> Thanks, >> Steven >> >> [1] https://github.com/apache/iceberg/pull/1404 >> [2] >> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L363 >> [3] https://github.com/apache/iceberg/issues/1403 >> >> >> On Mon, Sep 14, 2020 at 12:03 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi all, >>> >>> >>> Very thanks for the discussion and the valuable opinions! Currently there >>> are several ongoing issues and we would like to show what we are thinking >>> in the next few mails. >>> >>> It seems that the biggest issue now is about the topology of the sinks. >>> Before deciding what the sink API would look like, I would like to first >>> summarize the different topologies we have mentioned so that we could >>> sync >>> on the same page and gain more insights about this issue. There are four >>> types of topology I could see. Please correct me if I misunderstand what >>> you mean: >>> >>> 1. >>> >>> Commit individual files. (StreamingFileSink) >>> 1. >>> >>> FileWriter -> FileCommitter >>> 2. >>> >>> Commit a directory (HiveSink) >>> 1. >>> >>> FileWriter -> FileCommitter -> GlobalCommitter >>> 3. >>> >>> Commit a bundle of files (Iceberg) >>> 1. >>> >>> DataFileWriter -> GlobalCommitter >>> 4. >>> >>> Commit a directory with merged files(Some user want to merge the files >>> in a directory before committing the directory to Hive meta store) >>> 1. >>> >>> FileWriter -> SingleFileCommit -> FileMergeWriter -> >>> GlobalCommitter >>> >>> >>> It can be seen from the above that the topologies are different according >>> to different requirements. Not only that there may be other options for >>> the >>> second and third categories. E.g >>> >>> A alternative topology option for the IcebergSink might be : >>> DataFileWriter >>> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg >>> take care of the cleanup instead of coupling the cleanup logic to the >>> committer. >>> >>> >>> In the long run I think we might provide the sink developer the ability >>> to >>> build arbitrary topologies. Maybe Flink could only provide a basic commit >>> transformation and let the user build other parts of the topology. In the >>> 1.12 we might first provide different patterns for these different >>> scenarios at first and I think these components could be reused in the >>> future. >>> >>> Best, >>> Guowei >>> >>> >>> On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < >>> dwysakow...@apache.org> >>> wrote: >>> >>> > Hi all, >>> > >>> > > I would think that we only need flush() and the semantics are that it >>> > > prepares for a commit, so on a physical level it would be called from >>> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >>> > > think flush() should be renamed to something like "prepareCommit()". >>> > >>> > Generally speaking it is a good point that emitting the committables >>> > should happen before emitting the checkpoint barrier downstream. >>> > However, if I remember offline discussions well, the idea behind >>> > Writer#flush and Writer#snapshotState was to differentiate commit on >>> > checkpoint vs final checkpoint at the end of the job. Both of these >>> > methods could emit committables, but the flush should not leave any in >>> > progress state (e.g. in case of file sink in STREAM mode, in >>> > snapshotState it could leave some open files that would be committed in >>> > a subsequent cycle, however flush should close all files). The >>> > snapshotState as it is now can not be called in >>> > prepareSnapshotPreBarrier as it can store some state, which should >>> > happen in Operator#snapshotState as otherwise it would always be >>> > synchronous. Therefore I think we would need sth like: >>> > >>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >>> > >>> > ver 1: >>> > >>> > List<StateT> snapshotState(); >>> > >>> > ver 2: >>> > >>> > void snapshotState(); // not sure if we need that method at all in >>> option 2 >>> > >>> > > The Committer is as described in the FLIP, it's basically a function >>> > > "void commit(Committable)". The GobalCommitter would be a function >>> "void >>> > > commit(List<Committable>)". The former would be used by an S3 sink >>> where >>> > > we can individually commit files to S3, a committable would be the >>> list >>> > > of part uploads that will form the final file and the commit >>> operation >>> > > creates the metadata in S3. The latter would be used by something >>> like >>> > > Iceberg where the Committer needs a global view of all the commits >>> to be >>> > > efficient and not overwhelm the system. >>> > > >>> > > I don't know yet if sinks would only implement on type of commit >>> > > function or potentially both at the same time, and maybe Commit can >>> > > return some CommitResult that gets shipped to the GlobalCommit >>> function. >>> > I must admit it I did not get the need for Local/Normal + Global >>> > committer at first. The Iceberg example helped a lot. I think it makes >>> a >>> > lot of sense. >>> > >>> > > For Iceberg, writers don't need any state. But the GlobalCommitter >>> > > needs to >>> > > checkpoint StateT. For the committer, CommT is "DataFile". Since a >>> single >>> > > committer can collect thousands (or more) data files in one >>> checkpoint >>> > > cycle, as an optimization we checkpoint a single "ManifestFile" (for >>> the >>> > > collected thousands data files) as StateT. This allows us to absorb >>> > > extended commit outages without losing written/uploaded data files, >>> as >>> > > operator state size is as small as one manifest file per checkpoint >>> cycle >>> > > [2]. >>> > > ------------------ >>> > > StateT snapshotState(SnapshotContext context) throws Exception; >>> > > >>> > > That means we also need the restoreCommitter API in the Sink >>> interface >>> > > --------------- >>> > > Committer<CommT, StateT> restoreCommitter(InitContext context, StateT >>> > > state); >>> > I think this might be a valid case. Not sure though if I would go with >>> a >>> > "state" there. Having a state in a committer would imply we need a >>> > collect method as well. So far we needed a single method commit(...) >>> and >>> > the bookkeeping of the committables could be handled by the framework. >>> I >>> > think something like an optional combiner in the GlobalCommitter would >>> > be enough. What do you think? >>> > >>> > GlobalCommitter<CommT, GlobalCommT> { >>> > >>> > void commit(GlobalCommT globalCommittables); >>> > >>> > GlobalCommT combine(List<CommT> committables); >>> > >>> > } >>> > >>> > A different problem that I see here is how do we handle commit >>> failures. >>> > Should the committables (both normal and global be included in the next >>> > cycle, shall we retry it, ...) I think it would be worth laying it out >>> > in the FLIP. >>> > >>> > @Aljoscha I think you can find the code Steven was referring in here: >>> > >>> > >>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java >>> > >>> > Best, >>> > >>> > Dawid >>> > >>> > On 14/09/2020 15:19, Aljoscha Krettek wrote: >>> > > On 14.09.20 01:23, Steven Wu wrote: >>> > >> ## Writer interface >>> > >> >>> > >> For the Writer interface, should we add "*prepareSnapshot"* before >>> the >>> > >> checkpoint barrier emitted downstream? IcebergWriter would need >>> it. Or >>> > >> would the framework call "*flush*" before the barrier emitted >>> > >> downstream? >>> > >> that guarantee would achieve the same goal. >>> > > >>> > > I would think that we only need flush() and the semantics are that it >>> > > prepares for a commit, so on a physical level it would be called from >>> > > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >>> > > think flush() should be renamed to something like "prepareCommit()". >>> > > >>> > > @Guowei, what do you think about this? >>> > > >>> > >> In [1], we discussed the reason for Writer to emit (checkpointId, >>> CommT) >>> > >> tuple to the committer. The committer needs checkpointId to >>> separate out >>> > >> data files for different checkpoints if concurrent checkpoints are >>> > >> enabled. >>> > > >>> > > When can this happen? Even with concurrent checkpoints the snapshot >>> > > barriers would still cleanly segregate the input stream of an >>> operator >>> > > into tranches that should manifest in only one checkpoint. With >>> > > concurrent checkpoints, all that can happen is that we start a >>> > > checkpoint before a last one is confirmed completed. >>> > > >>> > > Unless there is some weirdness in the sources and some sources start >>> > > chk1 first and some other ones start chk2 first? >>> > > >>> > > @Piotrek, do you think this is a problem? >>> > > >>> > >> For the Committer interface, I am wondering if we should split the >>> > >> single >>> > >> commit method into separate "*collect"* and "*commit"* methods? This >>> > >> way, >>> > >> it can handle both single and multiple CommT objects. >>> > > >>> > > I think we can't do this. If the sink only needs a regular Commiter, >>> > > we can perform the commits in parallel, possibly on different >>> > > machines. Only when the sink needs a GlobalCommitter would we need to >>> > > ship all commits to a single process and perform the commit there. If >>> > > both methods were unified in one interface we couldn't make the >>> > > decision of were to commit in the framework code. >>> > > >>> > >> For Iceberg, writers don't need any state. But the GlobalCommitter >>> > >> needs to >>> > >> checkpoint StateT. For the committer, CommT is "DataFile". Since a >>> > >> single >>> > >> committer can collect thousands (or more) data files in one >>> checkpoint >>> > >> cycle, as an optimization we checkpoint a single "ManifestFile" >>> (for the >>> > >> collected thousands data files) as StateT. This allows us to absorb >>> > >> extended commit outages without losing written/uploaded data files, >>> as >>> > >> operator state size is as small as one manifest file per checkpoint >>> > >> cycle >>> > > >>> > > You could have a point here. Is the code for this available in >>> > > open-source? I was checking out >>> > > >>> > >>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java >>> > > and didn't find the ManifestFile optimization there. >>> > > >>> > > Best, >>> > > Aljoscha >>> > > >>> > >>> > >>> >>